Python bindings for Hyperspace (#36)

This commit is contained in:
Tarun 2020-08-05 14:33:02 -04:00 коммит произвёл GitHub
Родитель 1e2b4b94e5
Коммит aad15bdc33
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 618 добавлений и 11 удалений

Просмотреть файл

@ -19,7 +19,17 @@ jobs:
displayName: 'Running $sbt +compile'
- script: sbt +test
displayName: 'Running $sbt +test'
- task: Bash@3
inputs:
filePath: 'script/download_spark.sh'
displayName: 'Downloading spark'
- task: PythonScript@0
inputs:
scriptSource: 'filePath'
scriptPath: 'run-tests.py'
displayName: 'Running python tests'
env:
SPARK_HOME: $(Build.SourcesDirectory)/spark-2.4.2-bin-hadoop2.7
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt +package

Просмотреть файл

@ -16,28 +16,28 @@
name := "hyperspace-core"
sparkVersion := "2.4.2"
lazy val scala212 = "2.12.8"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala212, scala211)
lazy val sparkVersion = "2.4.2"
scalaVersion := scala212
crossScalaVersions := supportedScalaVersions
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" withSources(),
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" withSources(),
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "provided" withSources(),
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources(),
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources(),
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources(),
// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
)
scalacOptions ++= Seq(
@ -61,6 +61,20 @@ lazy val testScalastyle = taskKey[Unit]("testScalastyle")
testScalastyle := scalastyle.in(Test).toTask("").value
(test in Test) := ((test in Test) dependsOn testScalastyle).value
/***************************
* Spark Packages settings *
***************************/
spName := "microsoft/hyperspace-core"
spAppendScalaVersion := true
spIncludeMaven := true
spIgnoreProvided := true
packageBin in Compile := spPackage.value
/***********************
* Test configurations *
***********************/

Просмотреть файл

@ -59,7 +59,16 @@ Start the Spark Scala shell as follows:
```
#### PySpark
Support for Pyspark is [on the way](https://github.com/microsoft/hyperspace/pull/36).
Install Pyspark by running the following:
```
pip install pyspark==2.4.2
```
Then, run PySpark with the Hyperspace package:
```
pyspark --packages com.microsoft.hyperspace:hyperspace-core_2.11:0.2.0
```
## Hyperspace APIs

Просмотреть файл

@ -14,8 +14,12 @@
* limitations under the License.
*/
resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6")

Просмотреть файл

@ -0,0 +1,6 @@
from .hyperspace import Hyperspace
from .indexconfig import IndexConfig
__all__ = [
'Hyperspace', 'IndexConfig'
]

Просмотреть файл

@ -0,0 +1,172 @@
import sys
from pyspark.sql import SparkSession, DataFrame
from py4j.java_gateway import java_import
from .indexconfig import *
class Hyperspace:
def __init__(self, spark):
"""
Initializes Hyperspace object.
:param spark: sparkSession object
:return: Hyperspace object
>>> hyperspace = Hyperspace(spark)
"""
self.spark = spark
self.jvm = spark._jvm
self.hyperspace = self.jvm.com.microsoft.hyperspace.Hyperspace(spark._jsparkSession)
def _getJavaIndexConfig(self, index_config):
"""
Constructs IndexConfig Java object from python wrapper IndexConfig object.
:param index_config: IndexConfig java object
:return: IndexConfig python object
>>> _getJavaIndexConfig(idx_config)
"""
indexed_columns = self._getScalaSeqFromList(index_config.indexedColumns)
included_columns = self._getScalaSeqFromList(index_config.includedColumns)
_jindexConfig = self.jvm.com.microsoft.hyperspace.index.IndexConfig(
self.jvm.java.lang.String(index_config.indexName), indexed_columns, included_columns)
return _jindexConfig
def _getScalaSeqFromList(self, list):
"""
Constructs scala sequence from Java's List object.
:param list: List object in Java
:return: Seq object in scala
>>> _getScalaSeqFromList(list)
"""
java_import(self.jvm, "scala.collection.JavaConversions._")
java_import(self.jvm, "scala.collection.Seq")
java_import(self.jvm, 'java.util.*')
result_array_list = self.jvm.ArrayList(len(list))
for element in list:
result_array_list.add(self.jvm.String(element))
return self.jvm.scala.collection.JavaConverters.asScalaIteratorConverter(
result_array_list.iterator()).asScala().toSeq()
def indexes(self):
"""
Gets available indexes.
:return: dataFrame object containing list of indexes.
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.indexes()
"""
return DataFrame(self.hyperspace.indexes(), self.spark._wrapped)
def createIndex(self, dataFrame, indexConfig):
"""
Creates index on the given dataframe using the given indexConfig.
:param dataFrame: dataFrame
:param indexConfig: indexConfig
>>> hyperspace = Hyperspace(spark)
>>> idxConfig = IndexConfig("indexName", ["c1"], ["c2","c3"])
>>> df = spark.read.parquet("./sample.parquet").toDF("c1", "c2", "c3")
>>> hyperspace.createIndex(df, indexConfig)
"""
self.hyperspace.createIndex(dataFrame._jdf, self._getJavaIndexConfig(indexConfig))
def deleteIndex(self, indexName):
"""
Soft deletes given index.
:param indexName: index name
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.deleteIndex("indexname")
"""
self.hyperspace.deleteIndex(indexName)
def restoreIndex(self, indexName):
"""
Restores index with given index name.
:param indexName: index name
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.restoreIndex("indexname")
"""
self.hyperspace.restoreIndex(indexName)
def vacuumIndex(self, indexName):
"""
Vacuums index with given index name.
:param indexName: index name
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.vacuumIndex("indexname")
"""
self.hyperspace.vacuumIndex(indexName)
def refreshIndex(self, indexName):
"""
Update indexes for the latest version of the data.
:param indexName: index name
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.refreshIndex("indexname")
"""
self.hyperspace.refreshIndex(indexName)
def cancel(self, indexName):
"""
Cancel api to bring back index from an inconsistent state to the last known stable state.
:param indexName: index name
>>> hyperspace = Hyperspace(spark)
>>> hyperspace.cancel("indexname")
"""
self.hyperspace.cancel(indexName)
def explain(self, df, verbose=False, redirectFunc=lambda x: sys.stdout.write(x)):
"""
Explains how indexes will be applied to the given dataframe.
:param df: dataFrame
:param redirectFunc: optional function to redirect output of explain
>>> hyperspace = Hyperspace(spark)
>>> df = spark.read.parquet("./sample.parquet").toDF("c1", "c2", "c3")
>>> hyperspace.explain(df)
"""
analyzer = self.jvm.com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
result_string = analyzer.explainString(df._jdf, self.spark._jsparkSession,
self.hyperspace.indexes(), verbose)
redirectFunc(result_string)
@staticmethod
def enable(spark):
"""
Enables Hyperspace index usage on given spark session.
:param spark: sparkSession
>>> Hyperspace.enable(spark)
"""
spark._jvm.com.microsoft.hyperspace.util.PythonUtils.enableHyperspace(spark._jsparkSession)
return spark
@staticmethod
def disable(spark):
"""
Disables Hyperspace index usage on given spark session.
:param spark: sparkSession
>>> Hyperspace.disable(spark)
"""
spark._jvm.com.microsoft.hyperspace.util.PythonUtils.disableHyperspace(spark._jsparkSession)
return spark
@staticmethod
def isEnabled(spark):
"""
Checks if Hyperspace is enabled or not.
:param spark: sparkSession
>>> Hyperspace.isEnabled(spark)
"""
return spark._jvm.com.microsoft.hyperspace.util.PythonUtils. \
isHyperspaceEnabled(spark._jsparkSession)

Просмотреть файл

@ -0,0 +1,14 @@
class IndexConfig:
def __init__(self, indexName, indexedColumns, includedColumns):
"""
Initializes IndexConfig object.
:param indexName: index name
:param indexedColumns: indexed columns
:param includedColumns: included columns
:return: IndexConfig object
>>> idxConfig = IndexConfig("indexName", ["c1"], ["c2","c3"])
"""
self.indexName = indexName
self.indexedColumns = indexedColumns
self.includedColumns = includedColumns

Просмотреть файл

Просмотреть файл

@ -0,0 +1,16 @@
import unittest
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
class HyperspaceTestCase(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
self.spark = SparkSession.builder.master("local").appName(class_name).getOrCreate()
def tearDown(self):
self.spark.stop()
sys.path = self._old_sys_path

Просмотреть файл

Просмотреть файл

@ -0,0 +1,73 @@
import unittest
import tempfile
import shutil
import os
import sys
from pyspark.sql import SparkSession
from hyperspace import *
from hyperspace.testing.utils import HyperspaceTestCase
class HyperspaceIndexManagementTests(HyperspaceTestCase):
def setUp(self):
super(HyperspaceIndexManagementTests, self).setUp()
self.temp_path = tempfile.mkdtemp()
self.temp_index_path = tempfile.mkdtemp()
self.spark.conf.set("spark.hyperspace.index.creation.path", self.temp_index_path)
self.spark.conf.set("spark.hyperspace.system.path", self.temp_index_path)
data = [('Alice', 25, 'Seattle'), ('Bob', 27, 'Bellevue')]
df = self.spark.createDataFrame(data, ['name', 'age', 'city'])
self.data_file = os.path.join(self.temp_path, "tempFile.parquet")
df.write.parquet(self.data_file)
self.df = self.spark.read.parquet(self.data_file)
self.hyperspace = Hyperspace(self.spark)
def tearDown(self):
shutil.rmtree(self.temp_path)
shutil.rmtree(self.temp_index_path)
super(HyperspaceIndexManagementTests, self).tearDown()
def test_index_create(self):
idx_config = IndexConfig('idx1', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter("""name = "idx1" """).count(), 1)
def test_index_delete(self):
idx_config = IndexConfig('idx2', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 0)
self.hyperspace.deleteIndex("idx2")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 0)
def test_index_restore(self):
idx_config = IndexConfig('idx3', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 1)
self.hyperspace.restoreIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 0)
def test_index_vacuum(self):
idx_config = IndexConfig('idx4', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx4" and state = "DELETED" """).count(), 1)
self.hyperspace.vacuumIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter("""name = "idx4" """).count(), 0)
hyperspace_test = unittest.TestLoader().loadTestsFromTestCase(HyperspaceIndexManagementTests)
result = unittest.TextTestRunner(verbosity=3).run(hyperspace_test)
sys.exit(not result.wasSuccessful())

Просмотреть файл

@ -0,0 +1,59 @@
import unittest
import tempfile
import shutil
import os
import sys
from pyspark.sql import SQLContext
from hyperspace import *
from hyperspace.testing.utils import HyperspaceTestCase
class HyperspaceIndexUtilizationTests(HyperspaceTestCase):
def setUp(self):
super(HyperspaceIndexUtilizationTests, self).setUp()
self.temp_path = tempfile.mkdtemp()
self.temp_index_path = tempfile.mkdtemp()
self.spark.conf.set("spark.hyperspace.index.creation.path", self.temp_index_path)
self.spark.conf.set("spark.hyperspace.system.path", self.temp_index_path)
data = [('Alice', 25, 'Seattle', 'Dept1'), ('Bob', 27, 'Bellevue', 'Dept2')]
df = self.spark.createDataFrame(data, ['name', 'age', 'city', 'department'])
self.data_file = os.path.join(self.temp_path, "tempFile.parquet")
df.write.parquet(self.data_file)
self.df = self.spark.read.parquet(self.data_file)
self.hyperspace = Hyperspace(self.spark)
def tearDown(self):
shutil.rmtree(self.temp_path)
shutil.rmtree(self.temp_index_path)
super(HyperspaceIndexUtilizationTests, self).tearDown()
def test_hyperspace_enable_disable(self):
self.assertFalse(Hyperspace.isEnabled(self.spark),
"Hyperspace must be disabled by default.")
Hyperspace.enable(self.spark)
self.assertTrue(Hyperspace.isEnabled(self.spark),
"Hyperspace must be enabled after Hyperspace enable.")
Hyperspace.disable(self.spark)
self.assertFalse(Hyperspace.isEnabled(self.spark),
"Hyperspace must be disabled after Hyperspace disable.")
def test_hyperspace_explain(self):
idx_config = IndexConfig('idx1', ['age'], ['name', 'department'])
self.hyperspace.createIndex(self.df, idx_config)
self.df.createOrReplaceTempView("employees")
filter_query = self.spark.sql(""" SELECT age, name, department FROM employees
WHERE employees.age > 26 """)
def verify_result(input):
planAnalyzer = self.spark._jvm.com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
jvmExplainString = planAnalyzer.explainString(filter_query._jdf,
self.spark._jsparkSession,
self.hyperspace.indexes()._jdf, False)
self.assertTrue(input == jvmExplainString)
self.hyperspace.explain(filter_query, False, verify_result)
hyperspace_test = unittest.TestLoader().loadTestsFromTestCase(HyperspaceIndexUtilizationTests)
result = unittest.TextTestRunner(verbosity=3).run(hyperspace_test)
sys.exit(not result.wasSuccessful())

120
python/run-tests.py Normal file
Просмотреть файл

@ -0,0 +1,120 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# This file contains code from the Apache Spark project (original license above)
# and Delta Lake project (same Apache 2.0 license above).
# It contains modifications, which are licensed as follows:
#
#
# Copyright (2020) The Hyperspace Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import fnmatch
import subprocess
import sys
import shutil
from os import path, environ
def test(root_dir, package):
# Run all of the test under test/python directory, each of them
# has main entry point to execute, which is python's unittest testing
# framework.
python_root_dir = path.join(root_dir, "python")
test_dir = path.join(python_root_dir, path.join("hyperspace", "tests"))
test_files = [os.path.join(test_dir, f) for f in os.listdir(test_dir)
if os.path.isfile(os.path.join(test_dir, f)) and
f.endswith(".py") and not f.startswith("_")]
extra_class_path = path.join(python_root_dir, path.join("hyperspace", "testing"))
for test_file in test_files:
try:
if environ.get('SPARK_HOME') is None:
print("SPARK_HOME is not set in the environment variables.")
sys.exit(1)
my_env = os.environ.copy()
cmd = [os.path.join(my_env["SPARK_HOME"], os.path.join("bin", "spark-submit")),
"--driver-class-path=%s" % extra_class_path,
"--jars=%s" % extra_class_path,
"--packages", package, test_file]
print("Running tests in %s\n=============" % test_file)
run_cmd(cmd, stream_output=True, env=my_env)
except:
print("Failed tests in %s" % (test_file))
raise
def delete_if_exists(path):
# if path exists, delete it.
if os.path.exists(path):
shutil.rmtree(path)
print("Deleted %s " % path)
def prepare(root_dir):
sbt_path = "sbt"
delete_if_exists(os.path.expanduser("~/.ivy2/cache/com/microsoft/hyperspace"))
delete_if_exists(os.path.expanduser("~/.m2/repository/com/microsoft/hyperspace"))
run_cmd([sbt_path, "clean", "publishM2"], stream_output=True)
package = "com.microsoft.hyperspace:hyperspace-core_2.12:0.2.0-SNAPSHOT"
return package
def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs):
cmd_env = os.environ.copy()
if env:
cmd_env.update(env)
if stream_output:
child = subprocess.Popen(cmd, env=cmd_env, **kwargs)
exit_code = child.wait()
if throw_on_error and exit_code != 0:
raise Exception("Non-zero exitcode: %s" % (exit_code))
return exit_code
else:
child = subprocess.Popen(
cmd,
env=cmd_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)
(stdout, stderr) = child.communicate()
exit_code = child.wait()
if throw_on_error and exit_code is not 0:
raise Exception(
"Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" %
(exit_code, stdout, stderr))
return (exit_code, stdout, stderr)
if __name__ == "__main__":
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
package = prepare(root_dir)
test(root_dir, package)

84
run-tests.py Normal file
Просмотреть файл

@ -0,0 +1,84 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# This file contains code from the Apache Spark project (original license above)
# and Delta Lake project (same Apache 2.0 license above).
# It contains modifications, which are licensed as follows:
#
#
# Copyright (2020) The Hyperspace Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import fnmatch
import subprocess
from os import path
import random
import string
import tempfile
def run_python_tests(root_dir):
print("##### Running Python tests #####")
python_test_script = path.join(root_dir, path.join("python", "run-tests.py"))
print("Calling script %s", python_test_script)
run_cmd(["python", python_test_script], stream_output=True)
def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs):
cmd_env = os.environ.copy()
if env:
cmd_env.update(env)
if stream_output:
child = subprocess.Popen(cmd, env=cmd_env, **kwargs)
exit_code = child.wait()
if throw_on_error and exit_code != 0:
raise Exception("Non-zero exitcode: %s" % (exit_code))
return exit_code
else:
child = subprocess.Popen(
cmd,
env=cmd_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)
(stdout, stderr) = child.communicate()
exit_code = child.wait()
if throw_on_error and exit_code is not 0:
raise Exception(
"Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" %
(exit_code, stdout, stderr))
return (exit_code, stdout, stderr)
if __name__ == "__main__":
root_dir = os.path.dirname(os.path.abspath(__file__))
run_python_tests(root_dir)

26
script/download_spark.sh Normal file
Просмотреть файл

@ -0,0 +1,26 @@
#!/usr/bin/env bash
#
#
# Copyright (2020) The Hyperspace Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# A utility script for build pipeline to download and install spark binaries for
# python tests to run.
SPARK_VERSION="2.4.2"
HADOOP_VERSION="2.7"
SPARK_DIR="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
curl -k -L -o "spark-${SPARK_VERSION}.tgz" "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_DIR}.tgz" && tar xzvf "spark-${SPARK_VERSION}.tgz"