Cross library build for Spark 2/3 (#421)

This commit is contained in:
Chungmin Lee 2021-04-28 15:14:52 +09:00 коммит произвёл GitHub
Родитель ad4b3af899
Коммит 4f8f24b9bc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
74 изменённых файлов: 1336 добавлений и 926 удалений

Просмотреть файл

@ -21,6 +21,20 @@ provided by the bot. You will only need to do this once across all repos using o
Please review our [contribution guide](CONTRIBUTING.md).
### Development on Windows
This repository contains symbolic links which don't work properly on Windows. To build this project on Windows, you can use our provided Git aliases to replace symbolic links with junctions.
```sh
$ git config --local include.path ../dev/.gitconfig
$ git replace-symlinks # replace symlinks with junctions
$ git restore-symlinks # restore symlinks
```
### Using IntelliJ
You can use the built-in sbt shell in IntelliJ without any problems. However, the built-in "Build Project" command may not work. To fix the issue, go to Project Structure -> Project Settings -> Modules, remove the "root" module, and mark `src/main/scala` as Sources for the "spark2_4" and "spark3_0" modules, as well as `src/main/scala-spark2` and `src/main/scala-spark3` directories for corresponding modules.
## Inspiration and Special Thanks
This project would not have been possible without the outstanding work from the following communities:

Просмотреть файл

@ -6,8 +6,8 @@ trigger:
- master
jobs:
- job: Build_2_11
displayName: 'Build sources and run unit tests for Scala 2.11'
- job: Build_Spark2_4_2_11
displayName: 'Build sources and run unit tests for Spark 2.4 / Scala 2.11'
pool:
vmImage: 'ubuntu-18.04'
steps:
@ -17,69 +17,69 @@ jobs:
versionSpec: '8'
jdkArchitectureOption: 'x64'
jdkSourceOption: 'PreInstalled'
# Use sbt 1.4.9. The default sbt launcher in ubuntu-18.04 20210405 image is
# 1.5.0, but the version has an issue to compile with 0.13.18.
# See: https://github.com/sbt/sbt/issues/6447
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt//bin/sbt ++2.11.12 clean
displayName: 'Running $sbt clean'
- script: /tmp/sbt/bin/sbt ++2.11.12 update
displayName: 'Running $sbt update'
- script: /tmp/sbt/bin/sbt ++2.11.12 compile
displayName: 'Running $sbt compile'
- script: /tmp/sbt/bin/sbt ++2.11.12 test
displayName: 'Running $sbt test'
- script: sbt ++2.11.12 "project spark2_4" clean update compile test
displayName: 'Running $sbt clean & update & compile & test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: /tmp/sbt/bin/sbt ++2.11.12 package
- script: sbt ++2.11.12 "project spark2_4" package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
inputs:
sourceFolder: '$(Build.SourcesDirectory)/target/'
contents: '**/*.jar'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark2.4/'
- task: PublishBuildArtifacts@1
displayName: 'Publish Hyperspace artifacts'
inputs:
artifactName: 'hyperspace-core'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'
artifactName: 'hyperspace-core_spark2.4'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark2.4/'
- job: Build_2_12
displayName: 'Build sources and run unit tests for Scala 2.12'
- job: Build_Spark2_4_2_12
displayName: 'Build sources and run unit tests for Spark 2.4 / Scala 2.12'
pool:
vmImage: 'ubuntu-18.04'
steps:
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt/bin/sbt ++2.12.8 clean
displayName: 'Running $sbt clean'
- script: /tmp/sbt/bin/sbt ++2.12.8 update
displayName: 'Running $sbt update'
- script: /tmp/sbt/bin/sbt ++2.12.8 compile
displayName: 'Running $sbt compile'
- script: /tmp/sbt/bin/sbt ++2.12.8 test
displayName: 'Running $sbt test'
- script: sbt ++2.12.8 "project spark2_4" clean update compile test
displayName: 'Running $sbt clean & update & compile & test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: /tmp/sbt/bin/sbt ++2.12.8 package
- script: sbt ++2.12.8 "project spark2_4" package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
inputs:
sourceFolder: '$(Build.SourcesDirectory)/target/'
contents: '**/*.jar'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark2.4/'
- task: PublishBuildArtifacts@1
displayName: 'Publish Hyperspace artifacts'
inputs:
artifactName: 'hyperspace-core'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'
artifactName: 'hyperspace-core_spark2.4'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark2.4/'
- job: Build_Spark3_0_2_12
displayName: 'Build sources and run unit tests for Spark 3.0 / Scala 2.12'
pool:
vmImage: 'ubuntu-18.04'
steps:
- script: sbt ++2.12.8 "project spark3_0" clean update compile test
displayName: 'Running $sbt clean & update & compile & test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt ++2.12.8 "project spark3_0" package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
inputs:
sourceFolder: '$(Build.SourcesDirectory)/target/'
contents: '**/*.jar'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark3.0/'
- task: PublishBuildArtifacts@1
displayName: 'Publish Hyperspace artifacts'
inputs:
artifactName: 'hyperspace-core_spark3.0'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark3.0/'
- job: PythonTest
displayName: 'Run Python tests'
@ -97,16 +97,8 @@ jobs:
versionSpec: '8'
jdkArchitectureOption: 'x64'
jdkSourceOption: 'PreInstalled'
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt/bin/sbt ++2.11.12 clean
displayName: 'Running $sbt clean'
- script: /tmp/sbt/bin/sbt ++2.11.12 update
displayName: 'Running $sbt update'
- script: /tmp/sbt/bin/sbt ++2.11.12 compile
displayName: 'Running $sbt compile'
- script: sbt ++2.11.12 "project spark2_4" clean update compile
displayName: 'Running $sbt clean & update & compile'
- task: Bash@3
inputs:
filePath: 'script/download_spark.sh'

176
build.sbt
Просмотреть файл

@ -14,107 +14,128 @@
* limitations under the License.
*/
name := "hyperspace-core"
sparkVersion := "2.4.2"
import Dependencies._
import Path.relativeTo
lazy val scala212 = "2.12.8"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala212, scala211)
scalaVersion := scala212
crossScalaVersions := supportedScalaVersions
ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources (),
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources (),
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources (),
"io.delta" %% "delta-core" % "0.6.1" % "provided" withSources (),
"org.apache.iceberg" % "iceberg-spark-runtime" % "0.11.0" % "provided" withSources (),
// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests")
ThisBuild / javaOptions += "-Xmx1024m"
assemblyMergeStrategy in assembly := {
case PathList("run-tests.py") => MergeStrategy.first
case x => (assemblyMergeStrategy in assembly).value(x)
}
scalacOptions ++= Seq("-target:jvm-1.8")
javaOptions += "-Xmx1024m"
// The following creates target/scala-2.*/src_managed/main/sbt-buildinfo/BuildInfo.scala.
// The root project is a virtual project aggregating the other projects.
// It cannot compile, as necessary utility code is only in those projects.
lazy val root = (project in file("."))
.aggregate(spark2_4, spark3_0)
.settings(
compile / skip := true,
publish / skip := true,
Keys.`package` := { new File("") }, // skip package
Keys.`packageBin` := { new File("") } // skip packageBin
)
lazy val spark2_4 = (project in file("spark2.4"))
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion),
buildInfoPackage := "com.microsoft.hyperspace")
commonSettings,
sparkVersion := Version(2, 4, 2),
crossScalaVersions := List(scala212, scala211),
inConfig(Compile)(addSparkVersionSpecificSourceDirectories),
inConfig(Test)(addSparkVersionSpecificSourceDirectories))
lazy val spark3_0 = (project in file("spark3.0"))
.enablePlugins(BuildInfoPlugin)
.settings(
commonSettings,
sparkVersion := Version(3, 0, 1),
crossScalaVersions := List(scala212), // Spark 3 doesn't support Scala 2.11
inConfig(Compile)(addSparkVersionSpecificSourceDirectories),
inConfig(Test)(addSparkVersionSpecificSourceDirectories))
lazy val sparkVersion = settingKey[Version]("sparkVersion")
// In addition to the usual scala/ and scala-<version>/ source directories,
// add the following source directories for different Spark versions:
// * scala-spark<spark major version>
// * scala-spark<spark major version>.<spark minor version>
// * scala-<scala version>-spark<spark major version>
// * scala-<scala version>-spark<spark major version>.<spark minor version>
lazy val addSparkVersionSpecificSourceDirectories = unmanagedSourceDirectories ++= Seq(
sourceDirectory.value / s"scala-spark${sparkVersion.value.major}",
sourceDirectory.value / s"scala-spark${sparkVersion.value.short}",
sourceDirectory.value / s"scala-${scalaBinaryVersion.value}-spark${sparkVersion.value.major}",
sourceDirectory.value / s"scala-${scalaBinaryVersion.value}-spark${sparkVersion.value.short}")
lazy val commonSettings = Seq(
// The following creates target/scala-2.*/src_managed/main/sbt-buildinfo/BuildInfo.scala.
buildInfoKeys := Seq[BuildInfoKey](
name,
version,
scalaVersion,
sbtVersion,
sparkVersion,
"sparkShortVersion" -> sparkVersion.value.short),
buildInfoPackage := "com.microsoft.hyperspace",
name := "hyperspace-core",
moduleName := name.value + s"_spark${sparkVersion.value.short}",
libraryDependencies ++= deps(sparkVersion.value),
// Scalastyle
scalastyleConfig := (ThisBuild / scalastyleConfig).value,
compileScalastyle := (Compile / scalastyle).toTask("").value,
Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value,
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value,
// Package Python files
(Compile / packageBin / mappings) := (Compile / packageBin / mappings).value ++ listPythonFiles.value,
listPythonFiles := {
val pythonBase = (ThisBuild / baseDirectory).value / "python"
pythonBase ** "*.py" pair relativeTo(pythonBase)
})
lazy val listPythonFiles = taskKey[Seq[(File, String)]]("listPythonFiles")
/**
* ScalaStyle configurations
*/
scalastyleConfig := baseDirectory.value / "scalastyle-config.xml"
ThisBuild / scalastyleConfig := baseDirectory.value / "scalastyle-config.xml"
// Run as part of compile task.
lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
compileScalastyle := scalastyle.in(Compile).toTask("").value
(compile in Compile) := ((compile in Compile) dependsOn compileScalastyle).value
// Run as part of test task.
lazy val testScalastyle = taskKey[Unit]("testScalastyle")
testScalastyle := scalastyle.in(Test).toTask("").value
(test in Test) := ((test in Test) dependsOn testScalastyle).value
/**
* Spark Packages settings
*/
spName := "microsoft/hyperspace-core"
spAppendScalaVersion := true
spIncludeMaven := true
spIgnoreProvided := true
packageBin in Compile := spPackage.value
/**
* Test configurations
*/
// Tests cannot be run in parallel since mutiple Spark contexts cannot run in the same JVM.
parallelExecution in Test := false
ThisBuild / Test / parallelExecution := false
fork in Test := true
ThisBuild / Test / fork := true
javaOptions in Test ++= Seq(
"-Dspark.ui.enabled=false",
"-Dspark.ui.showConsoleProgress=false",
"-Dspark.databricks.delta.snapshotPartitions=2",
"-Dspark.sql.shuffle.partitions=5",
"-Ddelta.log.cacheSize=3",
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
"-Xmx1024m")
ThisBuild / Test / javaOptions += "-Xmx1024m"
/**
* Release configurations
*/
organization := "com.microsoft.hyperspace"
organizationName := "Microsoft"
organizationHomepage := Some(url("http://www.microsoft.com/"))
ThisBuild / organization := "com.microsoft.hyperspace"
ThisBuild / organizationName := "Microsoft"
ThisBuild / organizationHomepage := Some(url("http://www.microsoft.com/"))
releaseCrossBuild := true
ThisBuild / releaseCrossBuild := true
scmInfo := Some(
ThisBuild / scmInfo := Some(
ScmInfo(
url("https://github.com/microsoft/hyperspace"),
"scm:git@github.com:microsoft/hyperspace.git"))
developers := List(
ThisBuild / developers := List(
Developer(
id = "rapoth",
name = "Rahul Potharaju",
@ -151,27 +172,31 @@ developers := List(
email = "",
url = url("https://github.com/thugsatbay")))
description := "Hyperspace: An Indexing Subsystem for Apache Spark"
licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
homepage := Some(url("https://github.com/microsoft/hyperspace"))
ThisBuild / description := "Hyperspace: An Indexing Subsystem for Apache Spark"
ThisBuild / licenses := List(
"Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
ThisBuild / homepage := Some(url("https://github.com/microsoft/hyperspace"))
// Remove all additional repository other than Maven Central from POM
pomIncludeRepository := { _ =>
ThisBuild / pomIncludeRepository := { _ =>
false
}
publishTo := {
ThisBuild / publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
else Some("releases" at nexus + "service/local/staging/deploy/maven2")
if (isSnapshot.value) {
Some("snapshots" at nexus + "content/repositories/snapshots")
} else {
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}
}
publishMavenStyle := true
ThisBuild / publishMavenStyle := true
import ReleaseTransformations._
releasePublishArtifactsAction := PgpKeys.publishSigned.value
ThisBuild / releasePublishArtifactsAction := PgpKeys.publishSigned.value
releaseProcess := Seq[ReleaseStep](
ThisBuild / releaseProcess := Seq[ReleaseStep](
checkSnapshotDependencies,
inquireVersions,
runClean,
@ -182,3 +207,8 @@ releaseProcess := Seq[ReleaseStep](
publishArtifacts,
setNextVersion,
commitNextVersion)
/**
* Others
*/
bspEnabled := false

3
dev/.gitconfig Normal file
Просмотреть файл

@ -0,0 +1,3 @@
[alias]
replace-symlinks = "!__git_replace_symlinks() {\nif [ -n \"$WINDIR\" ]; then\n symlinks=`git ls-files -s | awk '/120000/{print $4}'`\n cwd=`pwd`\n for s in $symlinks; do\n if [ -f $s ]; then\n target=`cat $s`\n dir=`dirname $s`\n name=`basename $s`\n cd $dir\n cmd <<< \"mklink /J \\\"$name.link\\\" \\\"$target\\\"\" > /dev/null\n if [ ! -e $name.link ]; then\n echo \"Failed to replace \\\"$s\\\" with a junction\"\n exit 1\n fi\n rm $name\n mv $name.link $name\n git update-index --assume-unchanged $name\n echo \"Replaced symlink $s with a junction\"\n cd \"$cwd\"\n fi\n done\nfi\n}\n__git_replace_symlinks"
restore-symlinks = "!__git_restore_symlinks() {\nsymlinks=`git ls-files -s | awk '/120000/{print $4}'`\nfor s in $symlinks; do\n if [ -h $s ]; then\n git update-index --no-assume-unchanged $s\n rm $s\n git checkout -- $s\n echo \"Restored symlink $s\"\n fi\ndone\n}\n__git_restore_symlinks"

Просмотреть файл

@ -0,0 +1,42 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import sbt._
object Dependencies {
def deps(sparkVersion: Version) = {
val sv = sparkVersion.toString
Seq(
"org.apache.spark" %% "spark-catalyst" % sv % "provided" withSources (),
"org.apache.spark" %% "spark-core" % sv % "provided" withSources (),
"org.apache.spark" %% "spark-sql" % sv % "provided" withSources (),
// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sv % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sv % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sv % "test" classifier "tests") ++
(if (sparkVersion < Version(3, 0, 0))
Seq(
"io.delta" %% "delta-core" % "0.6.1" % "provided" withSources (),
"org.apache.iceberg" % "iceberg-spark-runtime" % "0.11.0" % "provided" withSources ())
else
Seq(
"io.delta" %% "delta-core" % "0.8.0" % "provided" withSources (),
"org.apache.iceberg" % "iceberg-spark3-runtime" % "0.11.1" % "provided" withSources (),
"org.apache.hive" % "hive-metastore" % "2.3.8" % "test"))
}
}

27
project/Version.scala Normal file
Просмотреть файл

@ -0,0 +1,27 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
case class Version(major: Int, minor: Int, patch: Int) extends Ordered[Version] {
def short: String = s"$major.$minor"
override def toString: String = s"$major.$minor.$patch"
override def compare(that: Version): Int = {
import scala.math.Ordered.orderingToOrdered
(major, minor, patch) compare (that.major, that.minor, that.patch)
}
}

Просмотреть файл

@ -14,4 +14,4 @@
* limitations under the License.
*/
sbt.version=0.13.18
sbt.version=1.5.0

Просмотреть файл

@ -22,6 +22,4 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.7.0")

Просмотреть файл

@ -88,7 +88,7 @@ def prepare(root_dir):
version = '0.0.0'
with open(os.path.join(root_dir, "version.sbt")) as fd:
version = fd.readline().split('"')[1]
package = "com.microsoft.hyperspace:hyperspace-core_2.12:" + version
package = "com.microsoft.hyperspace:hyperspace-core_spark2.4_2.12:" + version
return package

1
spark2.4/src Symbolic link
Просмотреть файл

@ -0,0 +1 @@
../src

1
spark3.0/src Symbolic link
Просмотреть файл

@ -0,0 +1 @@
../src

Просмотреть файл

@ -0,0 +1,29 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.sources.delta
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
object DeltaLakeShims {
def getFiles(location: TahoeLogFileIndex): Seq[AddFile] = {
location
.getSnapshot(stalenessAcceptable = false)
.filesForScan(projection = Nil, location.partitionFilters, keepStats = false)
.files
}
}

Просмотреть файл

@ -0,0 +1,56 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.sources.iceberg
import scala.collection.JavaConverters._
import org.apache.iceberg.Table
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.hive.HiveCatalogs
import org.apache.iceberg.spark.source.IcebergSource
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.DataSourceOptions
import com.microsoft.hyperspace.util.JavaConverters._
object IcebergShims {
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match {
case DataSourceV2Relation(_: IcebergSource, _, _, _, _) => true
case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) = {
val conf = spark.sessionState.newHadoopConf()
val options = new DataSourceOptions(plan.asInstanceOf[DataSourceV2Relation].options.asJava)
val path = options.get("path").asScala.getOrElse {
throw new IllegalArgumentException("Cannot open table: path is not set")
}
val snapshotId = options.get("snapshot-id").asScala.map(_.toLong)
if (path.contains("/")) {
val tables = new HadoopTables(conf)
(tables.load(path), snapshotId)
} else {
val hiveCatalog = HiveCatalogs.loadCatalog(conf)
val tableIdentifier = TableIdentifier.parse(path)
(hiveCatalog.loadTable(tableIdentifier), snapshotId)
}
}
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace
package object shim {
val JoinWithoutHint = org.apache.spark.sql.catalyst.plans.logical.Join
val SQLExecution = org.apache.spark.sql.execution.SQLExecution
}

Просмотреть файл

@ -0,0 +1,28 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.sources.delta
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
object DeltaLakeShims {
def getFiles(location: TahoeLogFileIndex): Seq[AddFile] = {
location.getSnapshot
.filesForScan(projection = Nil, location.partitionFilters)
.files
}
}

Просмотреть файл

@ -0,0 +1,63 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.sources.iceberg
import org.apache.iceberg.Table
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.util.hyperspace.Utils
object IcebergShims {
// In Spark 3, the V2ScanRelationPushdown rule can convert DataSourceV2Relation into
// DataSourceV2ScanRelation.
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match {
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case DataSourceV2ScanRelation(_: SparkTable, _, _) => true
case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) =
plan match {
case r @ DataSourceV2Relation(table: SparkTable, _, _, _, _) =>
(table.table, Option(r.options.get("snapshot-id")).map(_.toLong))
case r @ DataSourceV2ScanRelation(table: SparkTable, _, _) =>
(table.table, getSnapshotId(r.scan))
case _ =>
throw new IllegalArgumentException(s"Unexpected plan type: ${plan.getClass.toString}")
}
// Temporary hack for Spark 3.0.
// There is no way to retrieve the snapshot id from DataSourceV2ScanRelation in Spark 3.0.
// We need to get it from the private field of SparkBatchQueryScan which is also not public.
// This hack won't be needed for Spark 3.1, as DataSourceV2ScanRelation will include
// DataSourceV2Relation and we can access the options.
private lazy val snapshotIdField = {
val f = Utils
.classForName("org.apache.iceberg.spark.source.SparkBatchQueryScan")
.getDeclaredField("snapshotId")
f.setAccessible(true)
f
}
private def getSnapshotId(scan: Scan): Option[Long] = {
Option(snapshotIdField.get(scan)).map(_.asInstanceOf[Long])
}
}

Просмотреть файл

@ -0,0 +1,36 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.shim
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, LogicalPlan}
object JoinWithoutHint {
def apply(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression]): Join = {
Join(left, right, joinType, condition, JoinHint.NONE)
}
def unapply(join: Join): Option[(LogicalPlan, LogicalPlan, JoinType, Option[Expression])] = {
Some((join.left, join.right, join.joinType, join.condition))
.filter(_ => join.hint == JoinHint.NONE)
}
}

Просмотреть файл

@ -0,0 +1,26 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.shim
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution => RealSQLExecution}
object SQLExecution {
def withNewExecutionId[T](session: SparkSession, qe: QueryExecution)(body: => T): T = {
RealSQLExecution.withNewExecutionId(qe)(body)
}
}

Просмотреть файл

@ -71,15 +71,16 @@ private[actions] abstract class RefreshActionBase(
// Reconstruct a df from schema
protected lazy val df = {
val relations = previousIndexLogEntry.relations
val latestRelation = Hyperspace
val relationMetadata = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(relations.head)
.refresh()
val latestRelation = relationMetadata.refresh()
val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType]
val df = spark.read
.schema(dataSchema)
.format(latestRelation.fileFormat)
val df = {
if (relationMetadata.canSupportUserSpecifiedSchema) spark.read.schema(dataSchema)
else spark.read
}.format(latestRelation.fileFormat)
.options(latestRelation.options)
// Due to the difference in how the "path" option is set: https://github.com/apache/spark/
// blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/

Просмотреть файл

@ -21,9 +21,10 @@ import scala.collection.immutable.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.DataSource
import com.microsoft.hyperspace.shim.SQLExecution
object DataFrameWriterExtensions {
/**

Просмотреть файл

@ -176,7 +176,7 @@ object ExtractFilterNode {
case filter @ Filter(condition: Expression, ExtractRelation(relation))
if !RuleUtils.isIndexApplied(relation) =>
val relationColumnsName = relation.plan.output.map(_.name)
val relationColumnsName = relation.output.map(_.name)
val filterColumnNames = condition.references.map(_.name).toSeq
Some(filter, filter, relationColumnsName, filterColumnNames)

Просмотреть файл

@ -30,6 +30,7 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.shim.JoinWithoutHint
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils._
@ -55,7 +56,7 @@ object JoinIndexRule
with HyperspaceEventLogging
with ActiveSparkSession {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case join @ Join(l, r, _, Some(condition)) if isApplicable(l, r, condition) =>
case join @ JoinWithoutHint(l, r, _, Some(condition)) if isApplicable(l, r, condition) =>
try {
getBestIndexPair(l, r, condition)
.map {
@ -236,8 +237,8 @@ object JoinIndexRule
condition: Expression): Boolean = {
// Output attributes from base relations. Join condition attributes must belong to these
// attributes. We work on canonicalized forms to make sure we support case-sensitivity.
val lBaseAttrs = l.plan.output.map(_.canonicalized)
val rBaseAttrs = r.plan.output.map(_.canonicalized)
val lBaseAttrs = l.output.map(_.canonicalized)
val rBaseAttrs = r.output.map(_.canonicalized)
def fromDifferentBaseRelations(c1: Expression, c2: Expression): Boolean = {
(lBaseAttrs.contains(c1) && rBaseAttrs.contains(c2)) ||
@ -295,8 +296,8 @@ object JoinIndexRule
// been already checked in `isApplicable`.
val leftRelation = RuleUtils.getRelation(spark, left).get
val rightRelation = RuleUtils.getRelation(spark, right).get
val lBaseAttrs = leftRelation.plan.output.map(_.name)
val rBaseAttrs = rightRelation.plan.output.map(_.name)
val lBaseAttrs = leftRelation.output.map(_.name)
val rBaseAttrs = rightRelation.output.map(_.name)
// Map of left resolved columns with their corresponding right resolved
// columns from condition.

Просмотреть файл

@ -271,12 +271,12 @@ object RuleUtils {
val indexFsRelation = new IndexHadoopFsRelation(
location,
new StructType(),
StructType(index.schema.filter(relation.plan.schema.contains(_))),
StructType(index.schema.filter(relation.schema.contains(_))),
if (useBucketSpec) Some(index.bucketSpec) else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
val updatedOutput = relation.plan.output
val updatedOutput = relation.output
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
.map(_.asInstanceOf[AttributeReference])
relation.createLogicalRelation(indexFsRelation, updatedOutput)
@ -369,7 +369,7 @@ object RuleUtils {
// rows from the deleted files.
val newSchema = StructType(
index.schema.filter(s =>
relation.plan.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
relation.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
IndexConstants.DATA_FILE_NAME_ID))))
def fileIndex: InMemoryFileIndex = {
@ -390,7 +390,7 @@ object RuleUtils {
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
val updatedOutput = relation.plan.output
val updatedOutput = relation.output
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
.map(_.asInstanceOf[AttributeReference])
@ -477,9 +477,9 @@ object RuleUtils {
// Set the same output schema with the index plan to merge them using BucketUnion.
// Include partition columns for data loading.
val partitionColumns = relation.partitionSchema.map(_.name)
val updatedSchema = StructType(relation.plan.schema.filter(col =>
val updatedSchema = StructType(relation.schema.filter(col =>
index.schema.contains(col) || relation.partitionSchema.contains(col)))
val updatedOutput = relation.plan.output
val updatedOutput = relation.output
.filter(attr =>
index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
.map(_.asInstanceOf[AttributeReference])

Просмотреть файл

@ -17,7 +17,7 @@
package com.microsoft.hyperspace.index.sources.default
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.hyperspace.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@ -159,7 +159,7 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
.map { path =>
val hdfsPath = new Path(path)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
qualified.toString -> SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
qualified.toString -> SparkHadoopUtil.globPathIfNecessary(fs, qualified)
}
.toMap

Просмотреть файл

@ -36,4 +36,6 @@ class DefaultFileBasedRelationMetadata(metadata: Relation) extends FileBasedRela
override def enrichIndexProperties(properties: Map[String, String]): Map[String, String] = {
properties
}
override def canSupportUserSpecifiedSchema: Boolean = true
}

Просмотреть файл

@ -47,10 +47,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
*/
lazy override val allFiles: Seq[FileStatus] = plan.relation match {
case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) =>
location
.getSnapshot(stalenessAcceptable = false)
.filesForScan(projection = Nil, location.partitionFilters, keepStats = false)
.files
DeltaLakeShims
.getFiles(location)
.map { f =>
toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path))
}
@ -74,10 +72,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
override def createRelationMetadata(fileIdTracker: FileIdTracker): Relation = {
plan.relation match {
case HadoopFsRelation(location: TahoeLogFileIndex, _, dataSchema, _, _, options) =>
val files = location
.getSnapshot(stalenessAcceptable = false)
.filesForScan(projection = Nil, location.partitionFilters, keepStats = false)
.files
val files = DeltaLakeShims
.getFiles(location)
.map { f =>
toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path))
}

Просмотреть файл

@ -53,4 +53,6 @@ class DeltaLakeRelationMetadata(metadata: Relation) extends FileBasedRelationMet
}
properties ++ deltaVerHistory
}
override def canSupportUserSpecifiedSchema: Boolean = false
}

Просмотреть файл

@ -16,7 +16,6 @@
package com.microsoft.hyperspace.index.sources.iceberg
import org.apache.iceberg.spark.source.IcebergSource
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@ -28,8 +27,9 @@ import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedRelat
* Iceberg file-based source provider.
*
* This source can support relations that meet the following criteria:
* - The relation is with [[DataSourceV2Relation]]
* - The source of [[DataSourceV2Relation]] is an [[IcebergSource]]
* - The relation is with [[DataSourceV2Relation]] or [[DataSourceV2ScanRelation]]
* - The source of [[DataSourceV2Relation]] is an [[IcebergSource]],
* or the table is an Iceberg table.
*/
class IcebergFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider {
@ -41,10 +41,8 @@ class IcebergFileBasedSource(private val spark: SparkSession) extends FileBasedS
* @param plan Logical plan to check if it's supported.
* @return Some(true) if the given plan is a supported relation, otherwise None.
*/
override def isSupportedRelation(plan: LogicalPlan): Option[Boolean] = plan match {
case _ @DataSourceV2Relation(_: IcebergSource, _, _, _, _) =>
Some(true)
case _ => None
override def isSupportedRelation(plan: LogicalPlan): Option[Boolean] = {
Some(IcebergShims.isIcebergRelation(plan)).filter(_ == true)
}
/**
@ -56,7 +54,8 @@ class IcebergFileBasedSource(private val spark: SparkSession) extends FileBasedS
*/
override def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = {
if (isSupportedRelation(plan).contains(true)) {
Some(new IcebergRelation(spark, plan.asInstanceOf[DataSourceV2Relation]))
val (table, snapshotId) = IcebergShims.loadIcebergTable(spark, plan)
Some(new IcebergRelation(spark, table, snapshotId, plan))
} else {
None
}

Просмотреть файл

@ -16,65 +16,72 @@
package com.microsoft.hyperspace.index.sources.iceberg
import java.util.Locale
import collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.iceberg.{FileScanTask, Schema, Table}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.hive.HiveCatalogs
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.spark.source.IcebergSource
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types.StructType
import com.microsoft.hyperspace.index.{Content, FileIdTracker, FileInfo, Hdfs, IndexConstants, Relation}
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexConstants, Relation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.PathUtils
/**
* Implementation for file-based relation used by [[IcebergFileBasedSource]]
*/
class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relation)
class IcebergRelation(
spark: SparkSession,
table: Table,
snapshotId: Option[Long],
override val plan: LogicalPlan)
extends FileBasedRelation {
/**
* The schema of the underlying table.
*
* In Spark 3.0, V2ScanRelationPushDown replaces DataSourceV2Relation with
* DataSourceV2ScanRelation, with a changed schema only containing projected
* columns. The colums might not include the partition columns, so we need to
* use the schema of the table and construct output from it to make it work
* with partition-aware hybrid scan.
*/
override def schema: StructType = SparkSchemaUtil.convert(table.schema)
override def output: Seq[Attribute] = {
plan.output ++
table.schema.columns.asScala
.filterNot(col => plan.output.exists(attr => col.name == attr.name))
.map(col => AttributeReference(col.name, SparkSchemaUtil.convert(col.`type`))())
}
/**
* Computes the signature of the current relation.
*/
override def signature: String = plan.source match {
case _: IcebergSource =>
val table = loadIcebergTable
val snapshotId = plan.options.getOrElse("snapshot-id", table.currentSnapshot().snapshotId())
snapshotId + table.location()
override def signature: String = {
snapshotId.getOrElse(table.currentSnapshot().snapshotId()).toString + table.location()
}
/**
* All the files that the current Iceberg table uses for read.
*/
override lazy val allFiles: Seq[FileStatus] = plan.source match {
case _: IcebergSource =>
loadIcebergTable.newScan().planFiles().iterator().asScala.toSeq.map(toFileStatus)
override lazy val allFiles: Seq[FileStatus] = {
table.newScan().planFiles().iterator().asScala.toSeq.map(toFileStatus)
}
/**
* The optional partition base path of the current relation.
*/
override def partitionBasePath: Option[String] = plan.source match {
case _: IcebergSource =>
val table = loadIcebergTable
if (table.spec().isUnpartitioned) {
None
} else {
Some(
PathUtils.makeAbsolute(table.location(), spark.sessionState.newHadoopConf()).toString)
}
case _ => None
override def partitionBasePath: Option[String] = {
if (table.spec().isUnpartitioned) {
None
} else {
Some(PathUtils.makeAbsolute(table.location(), spark.sessionState.newHadoopConf()).toString)
}
}
/**
@ -84,33 +91,28 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
* @return [[Relation]] object that describes the current relation.
*/
override def createRelationMetadata(fileIdTracker: FileIdTracker): Relation = {
plan.source match {
case source: IcebergSource =>
val dsOpts = new DataSourceOptions(plan.options.asJava)
val table = loadIcebergTable
val reader = source.createReader(dsOpts)
val files = allFiles
val files = allFiles
val sourceDataProperties =
Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get)
val fileFormatName = "iceberg"
val currentSnapshot = table.currentSnapshot()
val basePathOpt = partitionBasePath.map(p => Map("basePath" -> p)).getOrElse(Map.empty)
val opts = plan.options - "path" +
("snapshot-id" -> currentSnapshot.snapshotId().toString) +
("as-of-timestamp" -> currentSnapshot.timestampMillis().toString) ++
basePathOpt
val sourceDataProperties =
Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get)
val fileFormatName = "iceberg"
val currentSnapshot = table.currentSnapshot()
val basePathOpt =
partitionBasePath.map(p => Map("basePath" -> p)).getOrElse(Map.empty)
val opts = Map(
"snapshot-id" -> currentSnapshot.snapshotId().toString,
"as-of-timestamp" -> currentSnapshot.timestampMillis().toString) ++
basePathOpt
Relation(
Seq(
PathUtils
.makeAbsolute(table.location(), spark.sessionState.newHadoopConf())
.toString),
Hdfs(sourceDataProperties),
reader.readSchema().json,
fileFormatName,
opts)
}
Relation(
Seq(
PathUtils
.makeAbsolute(table.location(), spark.sessionState.newHadoopConf())
.toString),
Hdfs(sourceDataProperties),
SparkSchemaUtil.convert(table.schema).json,
fileFormatName,
opts)
}
/**
@ -152,20 +154,22 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
/**
* Options of the current relation.
*
* In Spark 3, DataSourceV2Relation can be converted into DataSourceV2ScanRelation
* and we cannot access the options. This should be okay, as the only values we were
* using were "path" and "snapshot-id", which can be retrieved with other means.
*/
override def options: Map[String, String] = plan.options
override def options: Map[String, String] = Map[String, String]()
/**
* The partition schema of the current relation.
*/
override def partitionSchema: StructType = plan.source match {
case _: IcebergSource =>
val tbl = loadIcebergTable
val fields = tbl.spec().fields().asScala.map { p =>
tbl.schema().findField(p.name())
}
val schema = new Schema(fields.asJava)
SparkSchemaUtil.convert(schema)
override def partitionSchema: StructType = {
val fields = table.spec().fields().asScala.map { p =>
table.schema().findField(p.name())
}
val schema = new Schema(fields.asJava)
SparkSchemaUtil.convert(schema)
}
/**
@ -176,15 +180,14 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
override def createHadoopFsRelation(
location: FileIndex,
dataSchema: StructType,
options: Map[String, String]): HadoopFsRelation = plan.source match {
case _: IcebergSource =>
HadoopFsRelation(
location,
partitionSchema,
dataSchema,
None,
new ParquetFileFormat,
options + IndexConstants.INDEX_RELATION_IDENTIFIER)(spark)
options: Map[String, String]): HadoopFsRelation = {
HadoopFsRelation(
location,
partitionSchema,
dataSchema,
None,
new ParquetFileFormat,
options + IndexConstants.INDEX_RELATION_IDENTIFIER)(spark)
}
/**
@ -200,23 +203,6 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
new LogicalRelation(hadoopFsRelation, updatedOutput, None, false)
}
private def loadIcebergTable: Table = {
val options = new DataSourceOptions(plan.options.asJava)
val conf = spark.sessionState.newHadoopConf()
val path = options.get("path")
if (!path.isPresent) {
throw new IllegalArgumentException("Cannot open table: path is not set")
}
if (path.get.contains("/")) {
val tables = new HadoopTables(conf)
tables.load(path.get)
} else {
val hiveCatalog = HiveCatalogs.loadCatalog(conf)
val tableIdentifier = TableIdentifier.parse(path.get)
hiveCatalog.loadTable(tableIdentifier)
}
}
private def toFileStatus(fileScanTask: FileScanTask): FileStatus = {
val path = PathUtils.makeAbsolute(
new Path(fileScanTask.file().path().toString),

Просмотреть файл

@ -35,4 +35,6 @@ class IcebergRelationMetadata(metadata: Relation) extends FileBasedRelationMetad
override def enrichIndexProperties(properties: Map[String, String]): Map[String, String] = {
properties
}
override def canSupportUserSpecifiedSchema: Boolean = false
}

Просмотреть файл

@ -18,7 +18,7 @@ package com.microsoft.hyperspace.index.sources
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.StructType
@ -47,6 +47,16 @@ trait FileBasedRelation extends SourceRelation {
*/
def plan: LogicalPlan
/**
* The schema of the relation.
*/
def schema: StructType = plan.schema
/**
* The output of the relation.
*/
def output: Seq[Attribute] = plan.output
/**
* Options of the current relation.
*/
@ -252,4 +262,9 @@ trait FileBasedRelationMetadata extends SourceRelationMetadata {
* @param properties Index properties to enrich.
*/
def enrichIndexProperties(properties: Map[String, String]): Map[String, String]
/**
* Returns true if the source supports user specified schema, false otherwise.
*/
def canSupportUserSpecifiedSchema: Boolean
}

Просмотреть файл

@ -0,0 +1,34 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.util
import java.util.Optional
object JavaConverters {
def optionalAsScalaOption[T](o: Optional[T]): Option[T] = {
if (o.isPresent) Some(o.get) else None
}
implicit def optionalAsScalaOptionConverter[T](o: Optional[T]): AsScala[Option[T]] = {
new AsScala(optionalAsScalaOption(o))
}
class AsScala[T](op: => T) {
def asScala: T = op
}
}

Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.hyperspace
import org.apache.hadoop.fs.{FileSystem, Path}
// SparkHadoopUtil became package-private in Spark 3.
object SparkHadoopUtil {
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] =
org.apache.spark.deploy.SparkHadoopUtil.get.globPathIfNecessary(fs, pattern)
}

Просмотреть файл

@ -0,0 +1,20 @@
$begin=============================================================
Plan with indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) && (Col2# = 2))
$highlightBegin+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col2:int,Col1:string>$highlightEnd
=============================================================
Plan without indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) && (Col2# = 2))
$highlightBegin+- FileScan parquet [Col1#,Col2#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col1:string,Col2:int>$highlightEnd
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
$end

Просмотреть файл

@ -0,0 +1,46 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- Exchange hashpartitioning(Col1#, 5)---->
<----: +- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) FileScan parquet [Col1#,Col2#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>---->
<----+- *(4) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- ReusedExchange [Col1#, Col2#], Exchange hashpartitioning(Col1#, 5)---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *Filter| 1| 2| 1|
| *InputAdapter| 4| 2| -2|
| *Project| 1| 2| 1|
| *ReusedExchange| 1| 0| -1|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Scan parquet| 1| 0| -1|
| *ShuffleExchange| 1| 0| -1|
| *Sort| 2| 0| -2|
| *WholeStageCodegen| 4| 3| -1|
| SortMergeJoin| 1| 1| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,45 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ScanV2 iceberg[Col1#, Col2#] (Filters: [isnotnull(Col1#)], Options: $icebergOptions)---->
<----+- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) ScanV2 iceberg[Col1#, Col2#] (Filters: [isnotnull(Col1#)], Options: $icebergOptions)---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *DataSourceV2Scan| 2| 0| -2|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Sort| 2| 0| -2|
| Filter| 2| 2| 0|
| InputAdapter| 2| 2| 0|
| Project| 2| 2| 0|
| SortMergeJoin| 1| 1| 0|
| WholeStageCodegen| 3| 3| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,47 @@
=============================================================
Plan with indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col1#) && (Col1# = Subquery subquery145))
: +- Subquery subquery145
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) && (Col2# = 1))
<----: +- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col2:int,Col1:string>---->
+- FileScan parquet [Col1#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
+- Subquery subquery145
+- *(1) Project [Col1#]
+- *(1) Filter (isnotnull(Col2#) && (Col2# = 1))
<----+- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col2:int,Col1:string>---->
=============================================================
Plan without indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col1#) && (Col1# = Subquery subquery145))
: +- Subquery subquery145
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) && (Col2# = 1))
<----: +- *(1) FileScan parquet [Col1#,Col2#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col1:string,Col2:int>---->
+- FileScan parquet [Col1#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
+- Subquery subquery145
+- *(1) Project [Col1#]
+- *(1) Filter (isnotnull(Col2#) && (Col2# = 1))
<----+- *(1) FileScan parquet [Col1#,Col2#] Batched: true, Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col1:string,Col2:int>---->
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
=============================================================
Physical operator stats:
=============================================================
+-----------------+-------------------+------------------+----------+
|Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+-----------------+-------------------+------------------+----------+
| Filter| 1| 1| 0|
| Project| 1| 1| 0|
| Scan parquet| 1| 1| 0|
|WholeStageCodegen| 1| 1| 0|
+-----------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,22 @@
$begin=============================================================
Plan with indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) AND (Col2# = 2))
+- ColumnarToRow
$highlightBegin+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 2)], Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col2:int,Col1:string>$highlightEnd
=============================================================
Plan without indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) AND (Col2# = 2))
+- ColumnarToRow
$highlightBegin+- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 2)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col1:string,Col2:int>$highlightEnd
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
$end

Просмотреть файл

@ -0,0 +1,54 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) ColumnarToRow---->
<----+- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- Exchange hashpartitioning(Col1#, 5), true, [id=#]---->
<----: +- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>---->
<----+- *(4) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- ReusedExchange [Col1#, Col2#], Exchange hashpartitioning(Col1#, 5), true, [id=#]---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *ColumnarToRow| 1| 2| 1|
| *Filter| 1| 2| 1|
| *InputAdapter| 5| 4| -1|
| *Project| 1| 2| 1|
| *ReusedExchange| 1| 0| -1|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Scan parquet| 1| 0| -1|
| *ShuffleExchange| 1| 0| -1|
| *Sort| 2| 0| -2|
| *WholeStageCodegen (3)| 0| 1| 1|
| *WholeStageCodegen (4)| 1| 0| -1|
| *WholeStageCodegen (5)| 1| 0| -1|
| SortMergeJoin| 1| 1| 0|
| WholeStageCodegen (1)| 1| 1| 0|
| WholeStageCodegen (2)| 1| 1| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,50 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) ColumnarToRow---->
<----+- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- *(1) Project [Col1#, Col2#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- BatchScan[Col1#, Col2#] $icebergPath [filters=Col1 IS NOT NULL]---->
<----+- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- *(2) Project [Col1#, Col2#]---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- BatchScan[Col1#, Col2#] $icebergPath [filters=Col1 IS NOT NULL]---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *BatchScan| 2| 0| -2|
| *ColumnarToRow| 0| 2| 2|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Sort| 2| 0| -2|
| Filter| 2| 2| 0|
| InputAdapter| 4| 4| 0|
| Project| 2| 2| 0|
| SortMergeJoin| 1| 1| 0|
| WholeStageCodegen (1)| 1| 1| 0|
| WholeStageCodegen (2)| 1| 1| 0|
| WholeStageCodegen (3)| 1| 1| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,45 @@
=============================================================
Plan with indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col1#) AND (Col1# = Subquery scalar-subquery#, [id=#]))
: +- Subquery scalar-subquery#, [id=#]
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) AND (Col2# = 1))
: +- *(1) ColumnarToRow
<----: +- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 1)], Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col2:int,Col1:string>---->
+- ColumnarToRow
+- FileScan parquet [Col1#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
=============================================================
Plan without indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col1#) AND (Col1# = Subquery scalar-subquery#, [id=#]))
: +- Subquery scalar-subquery#, [id=#]
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) AND (Col2# = 1))
: +- *(1) ColumnarToRow
<----: +- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 1)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col1:string,Col2:int>---->
+- ColumnarToRow
+- FileScan parquet [Col1#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
=============================================================
Physical operator stats:
=============================================================
+---------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+---------------------+-------------------+------------------+----------+
| ColumnarToRow| 1| 1| 0|
| Filter| 1| 1| 0|
| InputAdapter| 1| 1| 0|
| Project| 1| 1| 0|
| Scan parquet| 1| 1| 0|
|WholeStageCodegen (1)| 1| 1| 0|
+---------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,35 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.command.ExplainCommand
object SparkTestShims {
object Implicits {
implicit class TreeNodeExt(node: TreeNode[_]) {
def simpleStringFull: String = node.simpleString
}
}
object SimpleExplainCommand {
def apply(logicalPlan: LogicalPlan): ExplainCommand = {
ExplainCommand(logicalPlan, extended = false)
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.SimpleMode
import org.apache.spark.sql.execution.command.ExplainCommand
object SparkTestShims {
object Implicits {
implicit class TreeNodeExt(node: TreeNode[_]) {
def simpleStringFull: String = node.simpleString(Int.MaxValue)
}
}
object SimpleExplainCommand {
def apply(logicalPlan: LogicalPlan): ExplainCommand = {
ExplainCommand(logicalPlan, SimpleMode)
}
}
}

Просмотреть файл

@ -23,10 +23,8 @@ import com.microsoft.hyperspace.index.{HyperspaceSuite, IndexConfig}
import com.microsoft.hyperspace.util.FileUtils
class HyperspaceTest extends HyperspaceSuite {
override val systemPath = new Path("src/test/resources/indexLocation")
private val sampleData = SampleData.testData
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
private val sampleParquetDataLocation = inTempDir("sampleparquet")
private val indexConfig1 = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
private val indexConfig2 = IndexConfig("index2", Seq("Query"), Seq("imprs"))
private var df: DataFrame = _

Просмотреть файл

@ -34,6 +34,15 @@ trait SparkInvolvedSuite extends BeforeAndAfterAll with BeforeAndAfter {
.builder()
.master(s"local[$numParallelism]")
.config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger")
.config("delta.log.cacheSize", "3")
.config("spark.databricks.delta.snapshotPartitions", "2")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.shuffle.partitions", "5")
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
.config("spark.ui.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.appName(suiteName)
.getOrCreate()

Просмотреть файл

@ -29,10 +29,9 @@ import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager
import com.microsoft.hyperspace.util.FileUtils
class CreateActionTest extends SparkFunSuite with SparkInvolvedSuite with SQLHelper {
private val indexSystemPath = "src/test/resources/indexLocation"
class CreateActionTest extends HyperspaceSuite with SQLHelper {
private val sampleData = SampleData.testData
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
private val sampleParquetDataLocation = inTempDir("sampleparquet")
private val indexConfig = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
private var df: DataFrame = _
@ -52,12 +51,10 @@ class CreateActionTest extends SparkFunSuite with SparkInvolvedSuite with SQLHel
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, indexSystemPath)
when(mockLogManager.getLatestLog()).thenReturn(None)
when(mockLogManager.getLatestId()).thenReturn(None)
import spark.implicits._
FileUtils.delete(new Path(indexSystemPath))
FileUtils.delete(new Path(sampleParquetDataLocation))
val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks")

Просмотреть файл

@ -29,8 +29,8 @@ import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager
class RefreshActionTest extends SparkFunSuite with SparkInvolvedSuite {
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
class RefreshActionTest extends HyperspaceSuite {
private val sampleParquetDataLocation = inTempDir("sampleparquet")
private val fileSystem = new Path(sampleParquetDataLocation).getFileSystem(new Configuration)
private val mockLogManager: IndexLogManager = mock(classOf[IndexLogManager])
private val mockDataManager: IndexDataManager = mock(classOf[IndexDataManager])

Просмотреть файл

@ -39,6 +39,9 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import com.microsoft.hyperspace.BuildInfo
import com.microsoft.hyperspace.util.SparkTestShims.SimpleExplainCommand
// scalastyle:off filelinelengthchecker
/**
* Check that TPC-DS SparkPlans don't change.
@ -255,7 +258,7 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging {
}
def explainString(queryExecution: QueryExecution): String = {
val explain = ExplainCommand(queryExecution.logical, extended = false)
val explain = SimpleExplainCommand(queryExecution.logical)
spark.sessionState
.executePlan(explain)
.executedPlan
@ -269,13 +272,15 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging {
* Spark Only Suite.
*/
class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite {
override val goldenFilePath: String =
new File(baseResourcePath, "spark-2.4/approved-plans-v1_4").getAbsolutePath
override val goldenFilePath: String = {
new File(baseResourcePath, s"spark-${BuildInfo.sparkShortVersion}/approved-plans-v1_4").getAbsolutePath
}
// Enable cross join because some queries fail during query optimization phase.
withSQLConf("spark.sql.crossJoin.enabled" -> "true") {
tpcdsQueries.foreach { q =>
test(s"check simplified (tpcds-v1.4/$q)") {
assume(BuildInfo.sparkShortVersion == "2.4") // TODO: support Spark 3.0
testQuery("tpcds/queries", q)
}
}

Просмотреть файл

@ -230,14 +230,14 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"store" ->
"""
|`s_store_sk` INT,
|`s_store_id` CHAR(16),
|`s_store_id` STRING,
|`s_rec_start_date` DATE,
|`s_rec_end_date` DATE,
|`s_closed_date_sk` INT,
|`s_store_name` VARCHAR(50),
|`s_number_employees` INT,
|`s_floor_space` INT,
|`s_hours` CHAR(20),
|`s_hours` STRING,
|`s_manager` VARCHAR(40),
|`s_market_id` INT,
|`s_geography_class` VARCHAR(100),
@ -249,12 +249,12 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
|`s_company_name` VARCHAR(50),
|`s_street_number` VARCHAR(10),
|`s_street_name` VARCHAR(60),
|`s_street_type` CHAR(15),
|`s_suite_number` CHAR(10),
|`s_street_type` STRING,
|`s_suite_number` STRING,
|`s_city` VARCHAR(60),
|`s_county` VARCHAR(30),
|`s_state` CHAR(2),
|`s_zip` CHAR(10),
|`s_state` STRING,
|`s_zip` STRING,
|`s_country` VARCHAR(20),
|`s_gmt_offset` DECIMAL(5,2),
|`s_tax_percentage` DECIMAL(5,2)
@ -262,7 +262,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"call_center" ->
"""
|`cc_call_center_sk` INT,
|`cc_call_center_id` CHAR(16),
|`cc_call_center_id` STRING,
|`cc_rec_start_date` DATE,
|`cc_rec_end_date` DATE,
|`cc_closed_date_sk` INT,
@ -271,24 +271,24 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
|`cc_class` VARCHAR(50),
|`cc_employees` INT,
|`cc_sq_ft` INT,
|`cc_hours` CHAR(20),
|`cc_hours` STRING,
|`cc_manager` VARCHAR(40),
|`cc_mkt_id` INT,
|`cc_mkt_class` CHAR(50),
|`cc_mkt_class` STRING,
|`cc_mkt_desc` VARCHAR(100),
|`cc_market_manager` VARCHAR(40),
|`cc_division` INT,
|`cc_division_name` VARCHAR(50),
|`cc_company` INT,
|`cc_company_name` CHAR(50),
|`cc_street_number` CHAR(10),
|`cc_company_name` STRING,
|`cc_street_number` STRING,
|`cc_street_name` VARCHAR(60),
|`cc_street_type` CHAR(15),
|`cc_suite_number` CHAR(10),
|`cc_street_type` STRING,
|`cc_suite_number` STRING,
|`cc_city` VARCHAR(60),
|`cc_county` VARCHAR(30),
|`cc_state` CHAR(2),
|`cc_zip` CHAR(10),
|`cc_state` STRING,
|`cc_zip` STRING,
|`cc_country` VARCHAR(20),
|`cc_gmt_offset` DECIMAL(5,2),
|`cc_tax_percentage` DECIMAL(5,2)
@ -296,7 +296,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"catalog_page" ->
"""
|`cp_catalog_page_sk` INT,
|`cp_catalog_page_id` CHAR(16),
|`cp_catalog_page_id` STRING,
|`cp_start_date_sk` INT,
|`cp_end_date_sk` INT,
|`cp_department` VARCHAR(50),
@ -308,7 +308,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"web_site" ->
"""
|`web_site_sk` INT,
|`web_site_id` CHAR(16),
|`web_site_id` STRING,
|`web_rec_start_date` DATE,
|`web_rec_end_date` DATE,
|`web_name` VARCHAR(50),
@ -321,15 +321,15 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
|`web_mkt_desc` VARCHAR(100),
|`web_market_manager` VARCHAR(40),
|`web_company_id` INT,
|`web_company_name` CHAR(50),
|`web_street_number` CHAR(10),
|`web_company_name` STRING,
|`web_street_number` STRING,
|`web_street_name` VARCHAR(60),
|`web_street_type` CHAR(15),
|`web_suite_number` CHAR(10),
|`web_street_type` STRING,
|`web_suite_number` STRING,
|`web_city` VARCHAR(60),
|`web_county` VARCHAR(30),
|`web_state` CHAR(2),
|`web_zip` CHAR(10),
|`web_state` STRING,
|`web_zip` STRING,
|`web_country` VARCHAR(20),
|`web_gmt_offset` DECIMAL(5,2),
|`web_tax_percentage` DECIMAL(5,2)
@ -337,15 +337,15 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"web_page" ->
"""
|`wp_web_page_sk` INT,
|`wp_web_page_id` CHAR(16),
|`wp_web_page_id` STRING,
|`wp_rec_start_date` DATE,
|`wp_rec_end_date` DATE,
|`wp_creation_date_sk` INT,
|`wp_access_date_sk` INT,
|`wp_autogen_flag` CHAR(1),
|`wp_autogen_flag` STRING,
|`wp_customer_sk` INT,
|`wp_url` VARCHAR(100),
|`wp_type` CHAR(50),
|`wp_type` STRING,
|`wp_char_count` INT,
|`wp_link_count` INT,
|`wp_image_count` INT,
@ -354,65 +354,65 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"warehouse" ->
"""
|`w_warehouse_sk` INT,
|`w_warehouse_id` CHAR(16),
|`w_warehouse_id` STRING,
|`w_warehouse_name` VARCHAR(20),
|`w_warehouse_sq_ft` INT,
|`w_street_number` CHAR(10),
|`w_street_number` STRING,
|`w_street_name` VARCHAR(20),
|`w_street_type` CHAR(15),
|`w_suite_number` CHAR(10),
|`w_street_type` STRING,
|`w_suite_number` STRING,
|`w_city` VARCHAR(60),
|`w_county` VARCHAR(30),
|`w_state` CHAR(2),
|`w_zip` CHAR(10),
|`w_state` STRING,
|`w_zip` STRING,
|`w_country` VARCHAR(20),
|`w_gmt_offset` DECIMAL(5,2)
""".stripMargin,
"customer" ->
"""
|`c_customer_sk` INT,
|`c_customer_id` CHAR(16),
|`c_customer_id` STRING,
|`c_current_cdemo_sk` INT,
|`c_current_hdemo_sk` INT,
|`c_current_addr_sk` INT,
|`c_first_shipto_date_sk` INT,
|`c_first_sales_date_sk` INT,
|`c_salutation` CHAR(10),
|`c_first_name` CHAR(20),
|`c_last_name` CHAR(30),
|`c_preferred_cust_flag` CHAR(1),
|`c_salutation` STRING,
|`c_first_name` STRING,
|`c_last_name` STRING,
|`c_preferred_cust_flag` STRING,
|`c_birth_day` INT,
|`c_birth_month` INT,
|`c_birth_year` INT,
|`c_birth_country` VARCHAR(20),
|`c_login` CHAR(13),
|`c_email_address` CHAR(50),
|`c_login` STRING,
|`c_email_address` STRING,
|`c_last_review_date` INT
""".stripMargin,
"customer_address" ->
"""
|`ca_address_sk` INT,
|`ca_address_id` CHAR(16),
|`ca_street_number` CHAR(10),
|`ca_address_id` STRING,
|`ca_street_number` STRING,
|`ca_street_name` VARCHAR(60),
|`ca_street_type` CHAR(15),
|`ca_suite_number` CHAR(10),
|`ca_street_type` STRING,
|`ca_suite_number` STRING,
|`ca_city` VARCHAR(60),
|`ca_county` VARCHAR(30),
|`ca_state` CHAR(2),
|`ca_zip` CHAR(10),
|`ca_state` STRING,
|`ca_zip` STRING,
|`ca_country` VARCHAR(20),
|`ca_gmt_offset` DECIMAL(5,2),
|`ca_location_type` CHAR(20)
|`ca_location_type` STRING
""".stripMargin,
"customer_demographics" ->
"""
|`cd_demo_sk` INT,
|`cd_gender` CHAR(1),
|`cd_marital_status` CHAR(1),
|`cd_education_status` CHAR(20),
|`cd_gender` STRING,
|`cd_marital_status` STRING,
|`cd_education_status` STRING,
|`cd_purchase_estimate` INT,
|`cd_credit_rating` CHAR(10),
|`cd_credit_rating` STRING,
|`cd_dep_count` INT,
|`cd_dep_employed_count` INT,
|`cd_dep_college_count` INT
@ -420,7 +420,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"date_dim" ->
"""
|`d_date_sk` INT,
|`d_date_id` CHAR(16),
|`d_date_id` STRING,
|`d_date` DATE,
|`d_month_seq` INT,
|`d_week_seq` INT,
@ -433,53 +433,53 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
|`d_fy_year` INT,
|`d_fy_quarter_seq` INT,
|`d_fy_week_seq` INT,
|`d_day_name` CHAR(9),
|`d_quarter_name` CHAR(1),
|`d_holiday` CHAR(1),
|`d_weekend` CHAR(1),
|`d_following_holiday` CHAR(1),
|`d_day_name` STRING,
|`d_quarter_name` STRING,
|`d_holiday` STRING,
|`d_weekend` STRING,
|`d_following_holiday` STRING,
|`d_first_dom` INT,
|`d_last_dom` INT,
|`d_same_day_ly` INT,
|`d_same_day_lq` INT,
|`d_current_day` CHAR(1),
|`d_current_week` CHAR(1),
|`d_current_month` CHAR(1),
|`d_current_quarter` CHAR(1),
|`d_current_year` CHAR(1)
|`d_current_day` STRING,
|`d_current_week` STRING,
|`d_current_month` STRING,
|`d_current_quarter` STRING,
|`d_current_year` STRING
""".stripMargin,
"household_demographics" ->
"""
|`hd_demo_sk` INT,
|`hd_income_band_sk` INT,
|`hd_buy_potential` CHAR(15),
|`hd_buy_potential` STRING,
|`hd_dep_count` INT,
|`hd_vehicle_count` INT
""".stripMargin,
"item" ->
"""
|`i_item_sk` INT,
|`i_item_id` CHAR(16),
|`i_item_id` STRING,
|`i_rec_start_date` DATE,
|`i_rec_end_date` DATE,
|`i_item_desc` VARCHAR(200),
|`i_current_price` DECIMAL(7,2),
|`i_wholesale_cost` DECIMAL(7,2),
|`i_brand_id` INT,
|`i_brand` CHAR(50),
|`i_brand` STRING,
|`i_class_id` INT,
|`i_class` CHAR(50),
|`i_class` STRING,
|`i_category_id` INT,
|`i_category` CHAR(50),
|`i_category` STRING,
|`i_manufact_id` INT,
|`i_manufact` CHAR(50),
|`i_size` CHAR(20),
|`i_formulation` CHAR(20),
|`i_color` CHAR(20),
|`i_units` CHAR(10),
|`i_container` CHAR(10),
|`i_manufact` STRING,
|`i_size` STRING,
|`i_formulation` STRING,
|`i_color` STRING,
|`i_units` STRING,
|`i_container` STRING,
|`i_manager_id` INT,
|`i_product_name` CHAR(50)
|`i_product_name` STRING
""".stripMargin,
"income_band" ->
"""
@ -490,52 +490,52 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
"promotion" ->
"""
|`p_promo_sk` INT,
|`p_promo_id` CHAR(16),
|`p_promo_id` STRING,
|`p_start_date_sk` INT,
|`p_end_date_sk` INT,
|`p_item_sk` INT,
|`p_cost` DECIMAL(15,2),
|`p_response_target` INT,
|`p_promo_name` CHAR(50),
|`p_channel_dmail` CHAR(1),
|`p_channel_email` CHAR(1),
|`p_channel_catalog` CHAR(1),
|`p_channel_tv` CHAR(1),
|`p_channel_radio` CHAR(1),
|`p_channel_press` CHAR(1),
|`p_channel_event` CHAR(1),
|`p_channel_demo` CHAR(1),
|`p_promo_name` STRING,
|`p_channel_dmail` STRING,
|`p_channel_email` STRING,
|`p_channel_catalog` STRING,
|`p_channel_tv` STRING,
|`p_channel_radio` STRING,
|`p_channel_press` STRING,
|`p_channel_event` STRING,
|`p_channel_demo` STRING,
|`p_channel_details` VARCHAR(100),
|`p_purpose` CHAR(15),
|`p_discount_active` CHAR(1)
|`p_purpose` STRING,
|`p_discount_active` STRING
""".stripMargin,
"reason" ->
"""
|`r_reason_sk` INT,
|`r_reason_id` CHAR(16),
|`r_reason_desc` CHAR(100)
|`r_reason_id` STRING,
|`r_reason_desc` STRING
""".stripMargin,
"ship_mode" ->
"""
|`sm_ship_mode_sk` INT,
|`sm_ship_mode_id` CHAR(16),
|`sm_type` CHAR(30),
|`sm_code` CHAR(10),
|`sm_carrier` CHAR(20),
|`sm_contract` CHAR(20)
|`sm_ship_mode_id` STRING,
|`sm_type` STRING,
|`sm_code` STRING,
|`sm_carrier` STRING,
|`sm_contract` STRING
""".stripMargin,
"time_dim" ->
"""
|`t_time_sk` INT,
|`t_time_id` CHAR(16),
|`t_time_id` STRING,
|`t_time` INT,
|`t_hour` INT,
|`t_minute` INT,
|`t_second` INT,
|`t_am_pm` CHAR(2),
|`t_shift` CHAR(20),
|`t_sub_shift` CHAR(20),
|`t_meal_time` CHAR(20)
|`t_am_pm` STRING,
|`t_shift` STRING,
|`t_sub_shift` STRING,
|`t_meal_time` STRING
""".stripMargin
)

Просмотреть файл

@ -29,10 +29,9 @@ import com.microsoft.hyperspace.util.FileUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
override val systemPath = new Path("src/test/resources/indexLocation")
private val testDir = "src/test/resources/createIndexTests/"
private val nonPartitionedDataPath = testDir + "samplenestedparquet"
private val partitionedDataPath = testDir + "samplenestedpartitionedparquet"
private val testDir = inTempDir("createIndexTests")
private val nonPartitionedDataPath = testDir + "/samplenestedparquet"
private val partitionedDataPath = testDir + "/samplenestedpartitionedparquet"
private val partitionKeys = Seq("Date", "Query")
private val indexConfig1 =
IndexConfig("index1", Seq("nested.leaf.id"), Seq("Date", "nested.leaf.cnt"))

Просмотреть файл

@ -27,10 +27,9 @@ import com.microsoft.hyperspace.{BuildInfo, Hyperspace, HyperspaceException, Sam
import com.microsoft.hyperspace.util.FileUtils
class CreateIndexTest extends HyperspaceSuite with SQLHelper {
override val systemPath = new Path("src/test/resources/indexLocation")
private val testDir = "src/test/resources/createIndexTests/"
private val nonPartitionedDataPath = testDir + "sampleparquet"
private val partitionedDataPath = testDir + "samplepartitionedparquet"
private val testDir = inTempDir("createIndexTests")
private val nonPartitionedDataPath = testDir + "/sampleparquet"
private val partitionedDataPath = testDir + "/samplepartitionedparquet"
private val partitionKeys = Seq("Date", "Query")
private val indexConfig1 = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
private val indexConfig2 = IndexConfig("index2", Seq("Query"), Seq("imprs"))

Просмотреть файл

@ -32,11 +32,11 @@ import com.microsoft.hyperspace.{SampleData, SparkInvolvedSuite}
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.FileUtils
class DataFrameWriterExtensionsTest extends SparkFunSuite with SparkInvolvedSuite {
class DataFrameWriterExtensionsTest extends HyperspaceSuite {
private val sampleData = SampleData.testData
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
private val sampleDataBucketedLocation = "src/test/resources/sampleparquet-withBuckets"
private val sampleParquetDataLocation = inTempDir("sampleparquet")
private val sampleDataBucketedLocation = inTempDir("sampleparquet-withBuckets")
private var df: DataFrame = _

Просмотреть файл

@ -34,7 +34,7 @@ import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.util.PathUtils
class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/deltaLakeIntegrationTest")
override val indexLocationDirName = "deltaLakeIntegrationTest"
private val sampleData = SampleData.testData
private var hyperspace: Hyperspace = _

Просмотреть файл

@ -33,10 +33,9 @@ import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.util.PathUtils
class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
private val testDir = "src/test/resources/e2eTests/"
private val nonPartitionedDataPath = testDir + "sampleparquet"
private val partitionedDataPath = testDir + "samplepartitionedparquet"
override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation")
private val testDir = inTempDir("e2eTests")
private val nonPartitionedDataPath = testDir + "/sampleparquet"
private val partitionedDataPath = testDir + "/samplepartitionedparquet"
private val fileSystem = new Path(nonPartitionedDataPath).getFileSystem(new Configuration)
private var hyperspace: Hyperspace = _

Просмотреть файл

@ -27,7 +27,7 @@ import com.microsoft.hyperspace.util.FileUtils
// Hybrid Scan tests for non partitioned source data. Test cases of HybridScanSuite are also
// executed with non partitioned source data.
class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
override val systemPath = new Path("src/test/resources/hybridScanTest")
override val indexLocationDirName = "hybridScanTest"
override def beforeAll(): Unit = {
super.beforeAll()

Просмотреть файл

@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, In, InSet, Literal, Not}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, RepartitionByExpression, Union}
import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, UnionExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, UnionExec}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.internal.SQLConf
@ -33,7 +33,7 @@ import com.microsoft.hyperspace.index.plans.logical.BucketUnion
import com.microsoft.hyperspace.util.FileUtils
trait HybridScanSuite extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/hybridScanTest")
override val indexLocationDirName = "hybridScanTest"
val sampleData = SampleData.testData
var hyperspace: Hyperspace = _
@ -85,6 +85,11 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
before {
spark.conf.set(IndexConstants.INDEX_LINEAGE_ENABLED, "true")
// Dynamic pruning creates a dynamic filter, with different ids every time
// thus making basePlan.equals(join) fail
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false")
spark.enableHyperspace()
}
@ -284,7 +289,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
case p @ BucketUnionExec(children, bucketSpec) =>
assert(children.size === 2)
// children.head is always the index plan.
assert(children.head.isInstanceOf[ProjectExec])
assert(children.head.isInstanceOf[ProjectExec] || children.head.isInstanceOf[FilterExec])
assert(children.last.isInstanceOf[ShuffleExchangeExec])
assert(bucketSpec.numBuckets === 200)
p

Просмотреть файл

@ -17,22 +17,39 @@
package com.microsoft.hyperspace.index
import java.io.File
import java.nio.charset.StandardCharsets
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.util.hyperspace.Utils
import org.scalatest.{BeforeAndAfterAllConfigMap, ConfigMap}
import com.microsoft.hyperspace.{Hyperspace, SparkInvolvedSuite}
import com.microsoft.hyperspace.util.FileUtils
import com.microsoft.hyperspace.{BuildInfo, Hyperspace, SparkInvolvedSuite}
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
trait HyperspaceSuite
extends SparkFunSuite
with SparkInvolvedSuite
with BeforeAndAfterAllConfigMap {
// Needed to resolve conflicts between BeforeAndAfterAll and BeforeAndAfterAllConfigMap
override val invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected = false
// Temporary directory
lazy val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath)
// Returns a path starting from a temporary directory for the test.
def inTempDir(path: String): String = new Path(tempDir, path).toString
val indexLocationDirName: String = "indexLocation"
trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
// This is the system path that PathResolver uses to get the root of the indexes.
// Each test suite that extends HyperspaceSuite should define this.
val systemPath: Path
lazy val systemPath: Path = PathUtils.makeAbsolute(inTempDir(indexLocationDirName))
override def beforeAll(): Unit = {
override def beforeAll(cm: ConfigMap): Unit = {
super.beforeAll()
FileUtils.delete(systemPath)
FileUtils.delete(tempDir)
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, systemPath.toUri.toString)
clearCache()
}
@ -40,7 +57,7 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
override def afterAll(): Unit = {
clearCache()
spark.conf.unset(IndexConstants.INDEX_SYSTEM_PATH)
FileUtils.delete(systemPath)
FileUtils.delete(tempDir)
super.afterAll()
}
@ -90,20 +107,6 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
}
}
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns. This is copied from SparkFunSuite.scala in Spark 3.0.
*
* TODO: This can be removed when we support Spark 3.0.
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir()
try f(dir)
finally {
Utils.deleteRecursively(dir)
}
}
protected def withTempPathAsString(f: String => Unit): Unit = {
// The following is from SQLHelper.withTempPath with a modification to pass
// String instead of File to "f". The reason this is copied instead of extending
@ -119,4 +122,10 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
try f(pathStr)
finally Utils.deleteRecursively(path)
}
def getExpectedResult(name: String): String = {
org.apache.commons.io.FileUtils.readFileToString(
new File(s"src/test/resources/expected/spark-${BuildInfo.sparkShortVersion}", name),
StandardCharsets.UTF_8)
}
}

Просмотреть файл

@ -35,7 +35,7 @@ import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.PathUtils.DataPathFilter
class IcebergIntegrationTest extends QueryTest with HyperspaceSuite {
override val systemPath = PathUtils.makeAbsolute("src/test/resources/icebergIntegrationTest")
override val indexLocationDirName = "icebergIntegrationTest"
private val sampleData = SampleData.testData
private var hyperspace: Hyperspace = _
@ -46,6 +46,7 @@ class IcebergIntegrationTest extends QueryTest with HyperspaceSuite {
"spark.hyperspace.index.sources.fileBasedBuilders",
"com.microsoft.hyperspace.index.sources.iceberg.IcebergFileBasedSourceBuilder," +
"com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder")
spark.conf.set("spark.sql.legacy.bucketedTableScan.outputOrdering", true) // For Spark 3.0
hyperspace = new Hyperspace(spark)
}
@ -356,109 +357,14 @@ class IcebergIntegrationTest extends QueryTest with HyperspaceSuite {
val indexConfig = IndexConfig("joinIndex", Seq("Col1"), Seq("Col2"))
hyperspace.createIndex(iceDf, indexConfig)
val defaultDisplayMode = new PlainTextMode(getHighlightConf("", ""))
// Constructing expected output for given query from explain API
val expectedOutput = new StringBuilder
val joinIndexFilePath = getIndexFilesPath("joinIndex")
val joinIndexPath = getIndexRootPath("joinIndex")
// The format of the explain output looks as follows:
// scalastyle:off filelinelengthchecker
expectedOutput
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan with indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(1) Project [Col1#11, Col2#12]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#11)---->")
.append(defaultDisplayMode.newLine)
.append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(2) Project [Col1#21, Col2#22]---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) Filter isnotnull(Col1#21)---->")
.append(defaultDisplayMode.newLine)
.append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan without indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(1) Sort [Col1#11 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Project [Col1#1, Col2#2]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#1)---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) ScanV2 iceberg[Col1#, Col2#] (Filters: [isnotnull(Col1#)], Options: " +
truncate(s"[path=$dataPath,paths=[]]") + ")---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(2) Sort [Col1#21 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) Project [Col1#, Col2#]---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) Filter isnotnull(Col1#)---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) ScanV2 iceberg[Col1#, Col2#] (Filters: [isnotnull(Col1#)], Options: " +
truncate(s"[path=$dataPath,paths=[]]") + ")---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Indexes used:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append(s"joinIndex:$joinIndexPath")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Physical operator stats:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| *DataSourceV2Scan| 2| 0| -2|")
.append(defaultDisplayMode.newLine)
.append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|")
.append(defaultDisplayMode.newLine)
.append("| *Sort| 2| 0| -2|")
.append(defaultDisplayMode.newLine)
.append("| Filter| 2| 2| 0|")
.append(defaultDisplayMode.newLine)
.append("| InputAdapter| 2| 2| 0|")
.append(defaultDisplayMode.newLine)
.append("| Project| 2| 2| 0|")
.append(defaultDisplayMode.newLine)
.append("| SortMergeJoin| 1| 1| 0|")
.append(defaultDisplayMode.newLine)
.append("| WholeStageCodegen| 3| 3| 0|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
// scalastyle:on filelinelengthchecker
val expectedOutput = getExpectedResult("selfJoin_Iceberg.txt")
.replace("$joinIndexLocation", truncate(s"InMemoryFileIndex[$joinIndexFilePath]"))
.replace("$joinIndexPath", joinIndexPath.toString)
.replace("$icebergOptions", truncate(s"[path=$dataPath,paths=[]]"))
.replace("$icebergPath", dataPath)
val selfJoinDf = iceDf.join(iceDf, iceDf("Col1") === iceDf("Col1"))
verifyExplainOutput(selfJoinDf, expectedOutput.toString(), verbose = true) { df =>

Просмотреть файл

@ -26,18 +26,15 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData, Sp
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.FileUtils
class IndexCacheTest extends SparkFunSuite with SparkInvolvedSuite {
val sampleParquetDataLocation = "src/test/resources/sampleparquet"
val indexSystemPath = "src/test/resources/indexLocation"
class IndexCacheTest extends HyperspaceSuite {
val sampleParquetDataLocation = inTempDir("sampleparquet")
val indexConfig1 = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
before {
FileUtils.delete(new Path(indexSystemPath))
FileUtils.delete(new Path(sampleParquetDataLocation))
}
override def afterAll(): Unit = {
FileUtils.delete(new Path(indexSystemPath))
FileUtils.delete(new Path(sampleParquetDataLocation))
super.afterAll()
}
@ -132,7 +129,6 @@ class IndexCacheTest extends SparkFunSuite with SparkInvolvedSuite {
dfFromSample.write.parquet(sampleParquetDataLocation)
val df = spark.read.parquet(sampleParquetDataLocation)
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, indexSystemPath)
val hyperspace = new Hyperspace(spark)
hyperspace.createIndex(df, indexConfig1)

Просмотреть файл

@ -26,8 +26,7 @@ import com.microsoft.hyperspace.{HyperspaceException, SparkInvolvedSuite}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL}
class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite {
private val indexSystemPath = "src/test/resources/indexLocation"
class IndexCollectionManagerTest extends HyperspaceSuite {
private val testLogManagerFactory: IndexLogManagerFactory = new IndexLogManagerFactory {
override def create(indexPath: Path, hadoopConfiguration: Configuration): IndexLogManager =
new IndexLogManager {
@ -70,7 +69,6 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, indexSystemPath)
when(mockFileSystemFactory.create(any[Path], any[Configuration])).thenReturn(mockFileSystem)
indexCollectionManager = new IndexCollectionManager(
@ -121,27 +119,27 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite {
}
test("delete() throws exception if index is not found") {
when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false)
when(mockFileSystem.exists(new Path(systemPath, "idx4"))).thenReturn(false)
intercept[HyperspaceException](indexCollectionManager.delete("idx4"))
}
test("vacuum() throws exception if index is not found") {
when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false)
when(mockFileSystem.exists(new Path(systemPath, "idx4"))).thenReturn(false)
intercept[HyperspaceException](indexCollectionManager.vacuum("idx4"))
}
test("restore() throws exception if index is not found") {
when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false)
when(mockFileSystem.exists(new Path(systemPath, "idx4"))).thenReturn(false)
intercept[HyperspaceException](indexCollectionManager.restore("idx4"))
}
test("refresh() with mode = 'full' throws exception if index is not found") {
when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false)
when(mockFileSystem.exists(new Path(systemPath, "idx4"))).thenReturn(false)
intercept[HyperspaceException](indexCollectionManager.refresh("idx4", REFRESH_MODE_FULL))
}
test("refresh() with mode = 'incremental' throws exception if index is not found") {
when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false)
when(mockFileSystem.exists(new Path(systemPath, "idx4"))).thenReturn(false)
intercept[HyperspaceException](
indexCollectionManager.refresh("idx4", REFRESH_MODE_INCREMENTAL))
}

Просмотреть файл

@ -32,7 +32,7 @@ import com.microsoft.hyperspace.{BuildInfo, HyperspaceException, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.UNKNOWN_FILE_ID
import com.microsoft.hyperspace.util.{JsonUtils, PathUtils}
class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter {
class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
var testDir: file.Path = _
var f1: file.Path = _
var f2: file.Path = _
@ -44,9 +44,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
var fileIdTracker: FileIdTracker = _
override def beforeAll(): Unit = {
val testDirPath = Paths.get("src/test/resources/testDir")
val testDirPath = Paths.get(inTempDir("testDir"))
if (Files.exists(testDirPath)) {
FileUtils.deleteDirectory(new File("src/test/resources/testDir"))
FileUtils.deleteDirectory(new File(inTempDir("testDir")))
}
testDir = Files.createDirectories(testDirPath)

Просмотреть файл

@ -27,11 +27,8 @@ import com.microsoft.hyperspace.{SparkInvolvedSuite, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.HYPERSPACE_LOG
import com.microsoft.hyperspace.util.{FileUtils, JsonUtils}
class IndexLogManagerImplTest
extends SparkFunSuite
with SparkInvolvedSuite
with BeforeAndAfterAll {
val testRoot = "src/test/resources/indexLogManagerTests"
class IndexLogManagerImplTest extends HyperspaceSuite {
val testRoot = inTempDir("indexLogManagerTests")
val sampleIndexLogEntry: IndexLogEntry = IndexLogEntry(
"entityName",
CoveringIndex(

Просмотреть файл

@ -32,8 +32,7 @@ import com.microsoft.hyperspace.telemetry.OptimizeActionEvent
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
class IndexManagerTest extends HyperspaceSuite with SQLHelper {
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation")
private def sampleParquetDataLocation = inTempDir("sampleparquet")
private val indexConfig1 = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
private val indexConfig2 = IndexConfig("index2", Seq("Query"), Seq("imprs"))
private lazy val hyperspace: Hyperspace = new Hyperspace(spark)
@ -342,7 +341,7 @@ class IndexManagerTest extends HyperspaceSuite with SQLHelper {
// 3. Call optimize. Check the metadata. It should not contain small index files created
// during refresh operations.
withTempPathAsString { testPath =>
withSQLConf(OPTIMIZE_FILE_SIZE_THRESHOLD -> "900") {
withSQLConf(OPTIMIZE_FILE_SIZE_THRESHOLD -> "910") {
val indexConfig = IndexConfig("index", Seq("RGUID"), Seq("imprs"))
import spark.implicits._
val smallData = SampleData.testData

Просмотреть файл

@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import com.microsoft.hyperspace.SparkInvolvedSuite
import com.microsoft.hyperspace.shim.JoinWithoutHint
class IndexSignatureProviderTest extends SparkFunSuite with SparkInvolvedSuite {
private val fileLength1 = 100
@ -113,7 +114,7 @@ class IndexSignatureProviderTest extends SparkFunSuite with SparkInvolvedSuite {
t2Schema)
val joinCondition = EqualTo(t1c3, t2c2)
val joinNode = Join(r1, r2, JoinType("inner"), Some(joinCondition))
val joinNode = JoinWithoutHint(r1, r2, JoinType("inner"), Some(joinCondition))
val filterCondition = And(EqualTo(t1c1, Literal("ABC")), IsNotNull(t1c1))
val filterNode = Filter(filterCondition, joinNode)

Просмотреть файл

@ -25,9 +25,9 @@ import com.microsoft.hyperspace.TestUtils.logManager
import com.microsoft.hyperspace.util.FileUtils
class IndexStatisticsTest extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/indexStatsTest")
override val indexLocationDirName = "indexStatsTest"
private val dataColumns = Seq("Date", "RGUID", "Query", "imprs", "clicks")
private val dataPath = "src/test/resources/data/sampleparquet"
private val dataPath = inTempDir("sampleparquet")
private val indexConfig = IndexConfig("index1", Seq("RGUID"), Seq("Date"))
private var dataDF: DataFrame = _
private var hyperspace: Hyperspace = _

Просмотреть файл

@ -32,10 +32,9 @@ import com.microsoft.hyperspace.util.PathUtils.DataPathFilter
* Unit E2E test cases for RefreshIndex.
*/
class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/indexLocation")
private val testDir = "src/test/resources/RefreshIndexDeleteTests/"
private val nonPartitionedDataPath = testDir + "nonpartitioned"
private val partitionedDataPath = testDir + "partitioned"
private val testDir = inTempDir("RefreshIndexDeleteTests")
private val nonPartitionedDataPath = testDir + "/nonpartitioned"
private val partitionedDataPath = testDir + "/partitioned"
private val indexConfig = IndexConfig("index1", Seq("nested.leaf.id"), Seq("nested.leaf.cnt"))
private var hyperspace: Hyperspace = _

Просмотреть файл

@ -32,10 +32,9 @@ import com.microsoft.hyperspace.util.PathUtils.DataPathFilter
* Unit E2E test cases for RefreshIndex.
*/
class RefreshIndexTest extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/indexLocation")
private val testDir = "src/test/resources/RefreshIndexDeleteTests/"
private val nonPartitionedDataPath = testDir + "nonpartitioned"
private val partitionedDataPath = testDir + "partitioned"
private val testDir = inTempDir("RefreshIndexDeleteTests")
private val nonPartitionedDataPath = testDir + "/nonpartitioned"
private val partitionedDataPath = testDir + "/partitioned"
private val indexConfig = IndexConfig("index1", Seq("Query"), Seq("imprs"))
private var hyperspace: Hyperspace = _

Просмотреть файл

@ -28,8 +28,7 @@ import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.PathUtils.DataPathFilter
class ExplainTest extends SparkFunSuite with HyperspaceSuite {
private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation")
private val sampleParquetDataLocation = inTempDir("sampleparquet")
private val fileSystem = new Path(sampleParquetDataLocation).getFileSystem(new Configuration)
private var sampleParquetDataFullPath: String = ""
private var hyperspace: Hyperspace = _
@ -38,6 +37,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
super.beforeAll()
val sparkSession = spark
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.legacy.bucketedTableScan.outputOrdering", true) // For Spark 3.0
import sparkSession.implicits._
hyperspace = new Hyperspace(sparkSession)
@ -67,112 +67,15 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
val indexConfig = IndexConfig("joinIndex", Seq("Col1"), Seq("Col2"))
hyperspace.createIndex(df, indexConfig)
val defaultDisplayMode = new PlainTextMode(getHighlightConf("", ""))
// Constructing expected output for given query from explain API
val expectedOutput = new StringBuilder
val joinIndexFilePath = getIndexFilesPath("joinIndex")
val joinIndexPath = getIndexRootPath("joinIndex")
// The format of the explain output looks as follows:
// scalastyle:off filelinelengthchecker
expectedOutput
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan with indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(1) Project [Col1#11, Col2#12]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#11)---->")
.append(defaultDisplayMode.newLine)
.append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(2) Project [Col1#21, Col2#22]---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) Filter isnotnull(Col1#21)---->")
.append(defaultDisplayMode.newLine)
.append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan without indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- Exchange hashpartitioning(Col1#11, 5)---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Project [Col1#11, Col2#12]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#11)---->")
.append(defaultDisplayMode.newLine)
.append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Indexes used:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append(s"joinIndex:$joinIndexPath")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Physical operator stats:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| *Filter| 1| 2| 1|")
.append(defaultDisplayMode.newLine)
.append("| *InputAdapter| 4| 2| -2|")
.append(defaultDisplayMode.newLine)
.append("| *Project| 1| 2| 1|")
.append(defaultDisplayMode.newLine)
.append("| *ReusedExchange| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|")
.append(defaultDisplayMode.newLine)
.append("| *Scan parquet| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("| *ShuffleExchange| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("| *Sort| 2| 0| -2|")
.append(defaultDisplayMode.newLine)
.append("| *WholeStageCodegen| 4| 3| -1|")
.append(defaultDisplayMode.newLine)
.append("| SortMergeJoin| 1| 1| 0|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
// scalastyle:on filelinelengthchecker
val expectedOutput = getExpectedResult("selfJoin.txt")
.replace("$joinIndexLocation", truncate(s"InMemoryFileIndex[$joinIndexFilePath]"))
.replace("$joinIndexPath", joinIndexPath.toString)
.replace(
"$sampleParquetDataLocation",
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]"))
val selfJoinDf = df.join(df, df("Col1") === df("Col1"))
verifyExplainOutput(selfJoinDf, expectedOutput.toString(), verbose = true) { df =>
@ -187,125 +90,15 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
df.createOrReplaceTempView("query")
hyperspace.createIndex(df, indexConfig)
val displayMode = new PlainTextMode(getHighlightConf("<----", "---->"))
// Constructing expected output for given query from explain API
val expectedOutput = new StringBuilder
val filterIndexFilePath = getIndexFilesPath("filterIndex")
val filterIndexPath = getIndexRootPath("filterIndex")
// The format of the explain output looks as follows:
// scalastyle:off filelinelengthchecker
expectedOutput
.append("=============================================================")
.append(displayMode.newLine)
.append("Plan with indexes:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Project [Col1#135]")
.append(displayMode.newLine)
.append("+- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145))")
.append(displayMode.newLine)
.append(" : +- Subquery subquery145")
.append(displayMode.newLine)
.append(" : +- *(1) Project [Col1#135]")
.append(displayMode.newLine)
.append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))")
.append(displayMode.newLine)
.append(" <----: +- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135]")
.append(" Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
.append(displayMode.newLine)
.append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>")
.append(displayMode.newLine)
.append(" +- Subquery subquery145")
.append(displayMode.newLine)
.append(" +- *(1) Project [Col1#135]")
.append(displayMode.newLine)
.append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))")
.append(displayMode.newLine)
.append(" <----+- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135] " +
"Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Plan without indexes:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Project [Col1#135]")
.append(displayMode.newLine)
.append("+- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145))")
.append(displayMode.newLine)
.append(" : +- Subquery subquery145")
.append(displayMode.newLine)
.append(" : +- *(1) Project [Col1#135]")
.append(displayMode.newLine)
.append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))")
.append(displayMode.newLine)
.append(" <----: +- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, " +
"Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
.append("ReadSchema: struct<Col1:string,Col2:int>---->")
.append(displayMode.newLine)
.append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>")
.append(displayMode.newLine)
.append(" +- Subquery subquery145")
.append(displayMode.newLine)
.append(" +- *(1) Project [Col1#135]")
.append(displayMode.newLine)
.append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))")
.append(displayMode.newLine)
.append(
" <----+- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, " +
"Format: Parquet, Location: ")
.append(truncate("InMemoryFileIndex[" + sampleParquetDataFullPath + "]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], " +
"ReadSchema: struct<Col1:string,Col2:int>---->")
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Indexes used:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("filterIndex:" + getIndexRootPath("filterIndex"))
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Physical operator stats:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("+-----------------+-------------------+------------------+----------+")
.append(displayMode.newLine)
.append("|Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|")
.append(displayMode.newLine)
.append("+-----------------+-------------------+------------------+----------+")
.append(displayMode.newLine)
.append("| Filter| 1| 1| 0|")
.append(displayMode.newLine)
.append("| Project| 1| 1| 0|")
.append(displayMode.newLine)
.append("| Scan parquet| 1| 1| 0|")
.append(displayMode.newLine)
.append("|WholeStageCodegen| 1| 1| 0|")
.append(displayMode.newLine)
.append("+-----------------+-------------------+------------------+----------+")
.append(displayMode.newLine)
.append(displayMode.newLine)
// scalastyle:on filelinelengthchecker
val expectedOutput = getExpectedResult("subquery.txt")
.replace("$filterIndexLocation", truncate(s"InMemoryFileIndex[$filterIndexFilePath]"))
.replace("$filterIndexPath", filterIndexPath.toString)
.replace(
"$sampleParquetDataLocation",
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]"))
val dfSubquery =
spark.sql("""select Col1 from query where
@ -342,113 +135,16 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
val indexConfig = IndexConfig("joinIndex", Seq("Col1"), Seq("Col2"))
hyperspace.createIndex(df, indexConfig)
val defaultDisplayMode = new PlainTextMode(getHighlightConf("", ""))
// Constructing expected output for given query from explain API
val expectedOutput = new StringBuilder
// The format of the explain output looks as follows:
val joinIndexFilePath = getIndexFilesPath("joinIndex")
val joinIndexPath = getIndexRootPath("joinIndex")
// scalastyle:off filelinelengthchecker
expectedOutput
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan with indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(1) Project [Col1#11, Col2#12]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#11)---->")
.append(defaultDisplayMode.newLine)
.append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(2) Project [Col1#21, Col2#22]---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- *(2) Filter isnotnull(Col1#21)---->")
.append(defaultDisplayMode.newLine)
.append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$joinIndexFilePath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Plan without indexes:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("SortMergeJoin [Col1#11], [Col1#21], Inner")
.append(defaultDisplayMode.newLine)
.append("<----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- Exchange hashpartitioning(Col1#11, 5)---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Project [Col1#11, Col2#12]---->")
.append(defaultDisplayMode.newLine)
.append("<----: +- *(1) Filter isnotnull(Col1#11)---->")
.append(defaultDisplayMode.newLine)
.append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>---->")
.append(defaultDisplayMode.newLine)
.append("<----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0---->")
.append(defaultDisplayMode.newLine)
.append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Indexes used:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append(s"joinIndex:$joinIndexPath")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("Physical operator stats:")
.append(defaultDisplayMode.newLine)
.append("=============================================================")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append("| *Filter| 1| 2| 1|")
.append(defaultDisplayMode.newLine)
.append("| *InputAdapter| 4| 2| -2|")
.append(defaultDisplayMode.newLine)
.append("| *Project| 1| 2| 1|")
.append(defaultDisplayMode.newLine)
.append("| *ReusedExchange| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|")
.append(defaultDisplayMode.newLine)
.append("| *Scan parquet| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("| *ShuffleExchange| 1| 0| -1|")
.append(defaultDisplayMode.newLine)
.append("| *Sort| 2| 0| -2|")
.append(defaultDisplayMode.newLine)
.append("| *WholeStageCodegen| 4| 3| -1|")
.append(defaultDisplayMode.newLine)
.append("| SortMergeJoin| 1| 1| 0|")
.append(defaultDisplayMode.newLine)
.append("+----------------------------------------------------------+-------------------+------------------+----------+")
.append(defaultDisplayMode.newLine)
.append(defaultDisplayMode.newLine)
// scalastyle:on filelinelengthchecker
val expectedOutput = getExpectedResult("selfJoin.txt")
.replace("$joinIndexLocation", truncate(s"InMemoryFileIndex[$joinIndexFilePath]"))
.replace("$joinIndexPath", joinIndexPath.toString)
.replace(
"$sampleParquetDataLocation",
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]"))
val selfJoinDf = df.join(df, df("Col1") === df("Col1"))
@ -467,56 +163,20 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
val indexConfig = IndexConfig("filterIndex", Seq("Col2"), Seq("Col1"))
hyperspace.createIndex(df, indexConfig)
val expectedOutput = new StringBuilder
expectedOutput
.append(displayMode.beginEndTag.open)
.append("=============================================================")
.append(displayMode.newLine)
.append("Plan with indexes:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Project [Col1#]")
.append(displayMode.newLine)
.append("+- Filter (isnotnull(Col2#) && (Col2# = 2))")
.append(displayMode.newLine)
.append(" " + displayMode.highlightTag.open)
.append("+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] ")
.append("Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]"))
.append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>" + displayMode.highlightTag.close)
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Plan without indexes:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Project [Col1#]")
.append(displayMode.newLine)
.append("+- Filter (isnotnull(Col2#) && (Col2# = 2))")
.append(displayMode.newLine)
.append(" " + displayMode.highlightTag.open + "+- FileScan parquet [Col1#,Col2#] ")
.append("Batched: true, Format: Parquet, Location: ")
// Note: The below conversion converts relative path to absolute path for comparison.
.append(truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", ")
.append("PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
.append("ReadSchema: struct<Col1:string,Col2:int>" + displayMode.highlightTag.close)
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("Indexes used:")
.append(displayMode.newLine)
.append("=============================================================")
.append(displayMode.newLine)
.append("filterIndex:")
.append(getIndexRootPath("filterIndex"))
.append(displayMode.newLine)
.append(displayMode.newLine)
.append(displayMode.beginEndTag.close)
val filterIndexFilePath = getIndexFilesPath("filterIndex")
val filterIndexPath = getIndexRootPath("filterIndex")
val expectedOutput = getExpectedResult("filter.txt")
.replace("$filterIndexLocation", truncate(s"InMemoryFileIndex[$filterIndexFilePath]"))
.replace("$filterIndexPath", filterIndexPath.toString)
.replace(
"$sampleParquetDataLocation",
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]"))
.replace("$begin", displayMode.beginEndTag.open)
.replace("$end", displayMode.beginEndTag.close)
.replace("$highlightBegin", displayMode.highlightTag.open)
.replace("$highlightEnd", displayMode.highlightTag.close)
.replace("\n", displayMode.newLine)
def filterQuery(query: DataFrame): DataFrame = {
query.filter("Col2 == 2").select("Col1")

Просмотреть файл

@ -27,7 +27,7 @@ import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite
import com.microsoft.hyperspace.util.FileUtils
class FilterIndexRankerTest extends HyperspaceRuleSuite {
override val systemPath = new Path("src/test/resources/FilterRankerTest")
override val indexLocationDirName = "FilterRankerTest"
var tempPlan: LogicalPlan = _
override def beforeAll(): Unit = {

Просмотреть файл

@ -28,7 +28,7 @@ import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite
import com.microsoft.hyperspace.util.FileUtils
class JoinIndexRankerTest extends HyperspaceRuleSuite with SQLHelper {
override val systemPath = new Path("src/test/resources/JoinRankerTest")
override val indexLocationDirName = "JoinRankerTest"
var leftPlan: LogicalPlan = _
var rightPlan: LogicalPlan = _

Просмотреть файл

@ -29,7 +29,7 @@ import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.PathUtils
class FilterIndexRuleTest extends HyperspaceRuleSuite {
override val systemPath = PathUtils.makeAbsolute("src/test/resources/joinIndexTest")
override val indexLocationDirName = "joinIndexTest"
val indexName1 = "filterIxTestIndex1"
val indexName2 = "filterIxTestIndex2"

Просмотреть файл

@ -25,10 +25,11 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
import com.microsoft.hyperspace.shim.{JoinWithoutHint => Join}
import com.microsoft.hyperspace.util.{FileUtils, PathUtils, SparkTestShims}
class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
override val systemPath = PathUtils.makeAbsolute("src/test/resources/joinIndexRuleTest")
override val indexLocationDirName = "joinIndexRuleTest"
val t1c1 = AttributeReference("t1c1", IntegerType)()
val t1c2 = AttributeReference("t1c2", StringType)()
@ -427,6 +428,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
val updatedNodeCount = plan2.treeString.split("\n").length
if (originalNodeCount == updatedNodeCount) {
import SparkTestShims.Implicits._
(0 until originalNodeCount).forall { i =>
plan1(i) match {
// for LogicalRelation, we just check if the updated also has LogicalRelation. If the
@ -434,7 +436,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
case _: LogicalRelation => plan2(i).isInstanceOf[LogicalRelation]
// for other node types, we compare exact matching between original and updated plans
case node => node.simpleString.equals(plan2(i).simpleString)
case node => node.simpleStringFull.equals(plan2(i).simpleStringFull)
}
}
} else {

Просмотреть файл

@ -26,10 +26,11 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags}
import com.microsoft.hyperspace.shim.JoinWithoutHint
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
override val systemPath = PathUtils.makeAbsolute("src/test/resources/ruleUtilsTest")
override val indexLocationDirName = "ruleUtilsTest"
val t1c1 = AttributeReference("t1c1", IntegerType)()
val t1c2 = AttributeReference("t1c2", StringType)()
@ -111,7 +112,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
}
test("Verify get logical relation for non-linear plan.") {
val joinNode = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None)
val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None)
val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode))
assert(r.isEmpty)
}

Просмотреть файл

@ -1 +1 @@
version in ThisBuild := "0.5.0-SNAPSHOT"
ThisBuild / version := "0.5.0-SNAPSHOT"