Initial source commit
This commit is contained in:
Родитель
5b530c5016
Коммит
85cea7edb1
|
@ -20,6 +20,8 @@ parts/
|
|||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
|
@ -38,12 +40,14 @@ pip-delete-this-directory.txt
|
|||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
|
@ -55,6 +59,7 @@ coverage.xml
|
|||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
|
@ -72,11 +77,26 @@ target/
|
|||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# celery beat schedule file
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
@ -102,3 +122,8 @@ venv.bak/
|
|||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"name": "Example Job to Run Deployed Egg",
|
||||
"new_cluster": {
|
||||
"spark_version": "5.3.x-scala2.11",
|
||||
"node_type_id": "Standard_DS3_v2",
|
||||
"num_workers": 1
|
||||
},
|
||||
"libraries": [
|
||||
{
|
||||
"egg": "dbfs:/FileStore/jars/67dd8695_e72b_4872_9f0b_b722fced6252-sparksimpleapp_1_0_0_py3_6-277b3.egg"
|
||||
}
|
||||
],
|
||||
"timeout_seconds": 3600,
|
||||
"max_retries": 1,
|
||||
"spark_python_task": {
|
||||
"python_file": "dbfs:/mnt/input/py/main.py",
|
||||
"parameters": ["/mnt/input/bank/bank.csv", "/mnt/output/SparkSimpleAppPY/test.csv"]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
import sys
|
||||
|
||||
from pyspark.sql import SparkSession
|
||||
from sparksimpleapp import rdd_csv_col_count
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
dataInput = sys.argv[1]
|
||||
dataOutput = sys.argv[2]
|
||||
|
||||
spark = (SparkSession.builder
|
||||
.appName("SparkSimpleApp")
|
||||
.getOrCreate()
|
||||
)
|
||||
|
||||
rdd = spark.sparkContext.textFile(dataInput)
|
||||
|
||||
rdd1 = rdd_csv_col_count(rdd)
|
||||
|
||||
rdd1.toDF().write.mode('overwrite').csv(dataOutput)
|
|
@ -0,0 +1 @@
|
|||
pytest
|
|
@ -0,0 +1,12 @@
|
|||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='sparksimpleapp',
|
||||
version='1.0.0',
|
||||
description='A simple Python app that counts rows in a file (arg1) and then saves that count to a path (arg2).',
|
||||
url='http://microsoft.com/',
|
||||
author='Data and AI Core Enablement',
|
||||
author_email='wijohns@microsoft.com',
|
||||
packages=['sparksimpleapp'],
|
||||
zip_safe=False
|
||||
)
|
|
@ -0,0 +1 @@
|
|||
from .app import *
|
|
@ -0,0 +1,3 @@
|
|||
def rdd_csv_col_count(rdd):
|
||||
rdd1 = rdd.map(lambda s: (len(s.split(",")), ) )
|
||||
return rdd1
|
|
@ -0,0 +1,15 @@
|
|||
from sparksimpleapp import rdd_csv_col_count
|
||||
|
||||
|
||||
class mockRDD(object):
|
||||
def __init__(self, values):
|
||||
self.values = values
|
||||
|
||||
def map(self, func):
|
||||
return map(func, self.values)
|
||||
|
||||
def test_rdd_csv_col_count():
|
||||
rdd = mockRDD(['a,b', 'c', 'd,e,f,'])
|
||||
expected = [2,1,4]
|
||||
results = rdd_csv_col_count(rdd)
|
||||
assert all([x == y[0] for x,y in zip(expected, results)])
|
201
README.md
201
README.md
|
@ -1,64 +1,177 @@
|
|||
---
|
||||
page_type: sample
|
||||
languages:
|
||||
- csharp
|
||||
products:
|
||||
- dotnet
|
||||
description: "Add 150 character max description"
|
||||
urlFragment: "update-this-to-unique-url-stub"
|
||||
---
|
||||
# DevOps for a Spark Jar, Egg Jobs
|
||||
|
||||
# Official Microsoft Sample
|
||||
**Assumptions:**
|
||||
1. You have created one or more (Dev/QA and Prod) Databricks Workspaces.
|
||||
1. You have generated a [Personal Access Token (PAT)](https://docs.databricks.com/dev-tools/api/latest/authentication.html#generate-a-token).
|
||||
1. You have created at least one mount point called `/mnt/jars` to house the jars or egg packages.
|
||||
* In this demo case, it's expecting two other mount points
|
||||
* `/mnt/input` with a file called `/mnt/input/bank/bank.csv` pulled from [UCI Machine Learning Repo](https://archive.ics.uci.edu/ml/datasets/Bank+Marketing)
|
||||
* `/mnt/output`
|
||||
1. You are familiar with either Scala or Python.
|
||||
|
||||
<!--
|
||||
Guidelines on README format: https://review.docs.microsoft.com/help/onboard/admin/samples/concepts/readme-template?branch=master
|
||||
## Build Pipeline
|
||||
|
||||
Guidance on onboarding samples to docs.microsoft.com/samples: https://review.docs.microsoft.com/help/onboard/admin/samples/process/onboarding?branch=master
|
||||
### ScalaSpark_App_Build
|
||||
In the build pipeline, you are going to create the Jar as an artifact produced by the build.
|
||||
|
||||
Taxonomies for products and languages: https://review.docs.microsoft.com/new-hope/information-architecture/metadata/taxonomies?branch=master
|
||||
-->
|
||||
1. Create a new Build Pipeline and name it ScalaSpark_App_Build
|
||||
1. Choose "Use the classic editor" for a visual development experience.
|
||||
1. Select a source
|
||||
* Azure Repos Git
|
||||
* Repository: D_AI_CE_DatabricksSparkDevOps
|
||||
* Default Branch: master
|
||||
1. Search for a "Maven" template and apply.
|
||||
1. In the Maven task, unlink the default Maven POM file and re-link it with `Scala/SparkSimpleApp/pom.xml`
|
||||
1. All other default options are okay.
|
||||
1. Save & queue. You should now have a running pipeline.
|
||||
|
||||
Give a short description for your sample here. What does it do and why is it important?
|
||||
### Pyspark_App_Build
|
||||
1. Create a new Build Pipeline and name it Pyspark_App_Build
|
||||
1. Choose "Use the classic editor" for a visual development experience.
|
||||
1. Select a source
|
||||
* Azure Repos Git
|
||||
* Repository: D_AI_CE_DatabricksSparkDevOps
|
||||
* Default Branch: master
|
||||
1. Search for a "Python" template and apply.
|
||||
* Delete the disabled Flake8 task.
|
||||
* For each task, if the "Advanced > Working Directory" option is present, set it as `Python/SparkSimpleApp`.
|
||||
1. Select the versions of Python you will test against by changing the Variable `python.version` to `3.6, 3.7, 3.8`.
|
||||
1. Change the `pytest command line` task to `pip install .\ && pip install pytest && pytest tests --doctest-modules --junitxml=junit/test-results.xml`
|
||||
* This will install the current working directory's package (`pip install .\` with the working directory set to `Python/SparkSimpleApp`).
|
||||
1. Set the `Use Python` task under Publish to `3.7`.
|
||||
1. Change `Build sdist`'s script to be `python setup.py bdist_egg` and change its Display name to `Build egg`
|
||||
1. Change `Publish Artifact: dist` tasks' Path to Publish to `Python/SparkSimpleApp/dist`
|
||||
1. All other default options are okay.
|
||||
1. Save & queue. You should now have a running pipeline.
|
||||
|
||||
## Contents
|
||||
|
||||
Outline the file contents of the repository. It helps users navigate the codebase, build configuration and any related assets.
|
||||
## Release Pipelines
|
||||
|
||||
| File/folder | Description |
|
||||
|-------------------|--------------------------------------------|
|
||||
| `src` | Sample source code. |
|
||||
| `.gitignore` | Define what to ignore at commit time. |
|
||||
| `CHANGELOG.md` | List of changes to the sample. |
|
||||
| `CONTRIBUTING.md` | Guidelines for contributing to the sample. |
|
||||
| `README.md` | This README file. |
|
||||
| `LICENSE` | The license for the sample. |
|
||||
The release pipeline allows you to deploy your jar or egg job to your target compute: Databricks Spark. Create your Release pipeline by going to Pipelines > Releases > + New Release Pipeline. Start with an Empty Job.
|
||||
|
||||
## Prerequisites
|
||||
!['Release Pipeline Artifacts'](docs/img/artifacts-release.png)
|
||||
|
||||
Outline the required components and tools that a user might need to have on their machine in order to run the sample. This can be anything from frameworks, SDKs, OS versions or IDE releases.
|
||||
* Add two artifacts:
|
||||
* Build: Choose the source build pipeline, default verison of Latest and default Source Alias (`_ScalaSpark_App_Build` or `_Pypark_App_Build`).
|
||||
* Azure Repo Git: Choose the repository that your code exists in. Name it **_code**.
|
||||
|
||||
## Setup
|
||||
* Add two stages:
|
||||
* QA
|
||||
* Prod
|
||||
|
||||
Explain how to prepare the sample once the user clones or downloads the repository. The section should outline every step necessary to install dependencies and set up any settings (for example, API keys and output folders).
|
||||
* Add two variable groups:
|
||||
* DevDatabricksVariables: Region and Token for Dev / QA environment
|
||||
* Add `DATABRICKS_REGION` (e.g. centralus or eastus2)
|
||||
* Add `DATABRICKS_TOKEN` and add your [Personal Access Token](https://docs.databricks.com/dev-tools/api/latest/authentication.html#generate-a-token). Apply the Lock to make it a hidden variable.
|
||||
* Choose the Scope as Stage > QA
|
||||
* ProdDatabricksVariables: Region and Token for production environment
|
||||
* Add `DATABRICKS_REGION` (e.g. centralus or eastus2)
|
||||
* Add `DATABRICKS_TOKEN` and add your [Personal Access Token](https://docs.databricks.com/dev-tools/api/latest/authentication.html#generate-a-token). Apply the Lock to make it a hidden variable.
|
||||
* Choose the Scope as Stage > Prod
|
||||
|
||||
## Runnning the sample
|
||||
* Add the Microsoft DevLabs' [DevOps for Azure Databricks](https://marketplace.visualstudio.com/items?itemName=riserrad.azdo-databricks) extension. This will give us some handy tasks that sit on top of the Databricks CLI. This will, however, force us to use a Windows Agent in the Release pipelines.
|
||||
* There's another extension by Data Thirst: [Databrick Script Deployment Task](https://marketplace.visualstudio.com/items?itemName=DataThirstLtd.databricksDeployScriptsTasks).
|
||||
* Feel free to explore this extension as it has additional UI driven tasks for the Databricks CLI.
|
||||
|
||||
Outline step-by-step instructions to execute the sample and see its output. Include steps for executing the sample from the IDE, starting specific services in the Azure portal or anything related to the overall launch of the code.
|
||||
### Release Scala Databricks
|
||||
|
||||
## Key concepts
|
||||
!['Release Tasks for Scala Jar'](docs/img/dbr-jar-release.png)
|
||||
|
||||
Provide users with more context on the tools and services used in the sample. Explain some of the code that is being used and how services interact with each other.
|
||||
Add the following tasks to both the QA and Prod stages (Pro Tip: You can do this once in QA and then Clone the stage and rename).
|
||||
|
||||
## Contributing
|
||||
1. Use Python Version
|
||||
* Set Version Spec to 3.6
|
||||
1. Configure Databricks (from Microsoft DevLabs)
|
||||
* Set Workspace URL to `https://$(DATABRICKS_REGION).azuredatabricks.net`
|
||||
* Set Access Token to `$(DATABRICKS_TOKEN)`
|
||||
* This creates a Databricks configuration profile of `AZDO`. We pass this to the deployment.py file.
|
||||
1. Databricks file to DBFS
|
||||
* Set Azure Region to `$(DATABRICKS_REGION)`
|
||||
* Set Local Root Folder to `$(System.DefaultWorkingDirectory)/_ScalaSpark_App_Build/drop/Scala/SparkSimpleApp/target`
|
||||
* Set File Pattern to `*.jar`
|
||||
* Set Target folder in DBFS to `/mnt/jars/`
|
||||
* Set Security
|
||||
* Authentication Method: Bearer Token
|
||||
* Databricks Bearer token: `$(DATABRICKS_TOKEN)`
|
||||
1. Python Script
|
||||
* Script Source: File Path
|
||||
* Script Path: `$(System.DefaultWorkingDirectory)/_code/deployment.py`
|
||||
|
||||
```
|
||||
jar $(System.DefaultWorkingDirectory)/_ScalaSpark_App_Build/drop/Scala/SparkSimpleApp/target dbfs:/mnt/jars $(System.DefaultWorkingDirectory)/_code/Scala/SparkSimpleApp/job.json --main-class com.microsoft.spark.example.SparkSimpleApp --parameters "/mnt/input/bank/bank.csv" "/mnt/output/SparkSimpleAppPY/test.csv" --profile AZDO
|
||||
```
|
||||
|
||||
This project welcomes contributions and suggestions. Most contributions require you to agree to a
|
||||
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
|
||||
the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
|
||||
You now have a working release pipeline! Save and execute the Release!
|
||||
|
||||
When you submit a pull request, a CLA bot will automatically determine whether you need to provide
|
||||
a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions
|
||||
provided by the bot. You will only need to do this once across all repos using our CLA.
|
||||
### Release Egg Databricks
|
||||
|
||||
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
|
||||
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
|
||||
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
|
||||
!['Release Tasks for Python Egg'](docs/img/dbr-egg-release.png)
|
||||
|
||||
Add the following tasks to both the QA and Prod stages (Pro Tip: You can do this once in QA and then Clone the stage and rename).
|
||||
|
||||
1. Use Python Version
|
||||
* Set Version Spec to 3.6
|
||||
1. Configure Databricks (from Microsoft DevLabs)
|
||||
* Set Workspace URL to `https://$(DATABRICKS_REGION).azuredatabricks.net`
|
||||
* Set Access Token to `$(DATABRICKS_TOKEN)`
|
||||
* This creates a Databricks configuration profile of `AZDO`. We pass this to the deployment.py file.
|
||||
1. Databricks file to DBFS
|
||||
* Set Azure Region to `$(DATABRICKS_REGION)`
|
||||
* Set Local Root Folder to `$(System.DefaultWorkingDirectory)/_Pyspark_App_Build/dist`
|
||||
* Set File Pattern to `*.jar`
|
||||
* Set Target folder in DBFS to `/mnt/jars/`
|
||||
* Set Security
|
||||
* Authentication Method: Bearer Token
|
||||
* Databricks Bearer token: `$(DATABRICKS_TOKEN)`
|
||||
1. Databricks file to DBFS
|
||||
* Set the settings the same as above with the following exceptions.
|
||||
* Set Local Root Folder to `$(System.DefaultWorkingDirectory)/_code/Python/SparkSimpleApp`
|
||||
* Set File Pattern to `main.py`
|
||||
1. Python Script
|
||||
* Script Source: File Path
|
||||
* Script Path: `$(System.DefaultWorkingDirectory)/_code/deployment.py`
|
||||
|
||||
```
|
||||
egg $(System.DefaultWorkingDirectory)/_Pyspark_App_Build/dist/ dbfs:/mnt/jars $(System.DefaultWorkingDirectory)/_code/Python/SparkSimpleApp/job.json --python-file "dbfs:/mnt/jars/main.py" --parameters "/mnt/input/bank/bank.csv" "/mnt/output/SparkSimpleAppPY/test.csv" --profile AZDO
|
||||
```
|
||||
|
||||
You now have a working release pipeline! Save and execute the Release!
|
||||
|
||||
# deployment.py
|
||||
|
||||
The deployment.py file helps abstract the calls to the Databricks CLI and enables you to replace text in the job's json definition.
|
||||
|
||||
The help file below describes the usage.
|
||||
|
||||
```
|
||||
usage: deployment.py [-h] [--python-file PYTHON_FILE]
|
||||
[--main-class MAIN_CLASS]
|
||||
[--parameters [PARAMETERS [PARAMETERS ...]]]
|
||||
[--profile PROFILE]
|
||||
[--update-if-exists UPDATE_IF_EXISTS UPDATE_IF_EXISTS]
|
||||
{jar,egg} library_path cloud_path job_json
|
||||
|
||||
Deploy a set of jar or egg files as a Spark application
|
||||
|
||||
positional arguments:
|
||||
{jar,egg} Valid options are jar or egg
|
||||
library_path The library or folder containing libraries to include
|
||||
cloud_path The path in the cloud (e.g. DBFS, WASB) that the
|
||||
library is located
|
||||
job_json The path to the job definition (only applicable to
|
||||
Databricks)
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
--python-file PYTHON_FILE
|
||||
The python file that runs the python application
|
||||
--main-class MAIN_CLASS
|
||||
The main class of your scala jar application
|
||||
--parameters [PARAMETERS [PARAMETERS ...]]
|
||||
List of parameters that get passed directly to the
|
||||
spark jar / python task
|
||||
--profile PROFILE Profile name to be passed to the databricks CLI
|
||||
--update-if-exists UPDATE_IF_EXISTS UPDATE_IF_EXISTS
|
||||
Looks for a job_id or name (useful only for Databricks
|
||||
deployments)
|
||||
```
|
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"name": "Example Job to Run Deployed Jar",
|
||||
"new_cluster": {
|
||||
"spark_version": "5.3.x-scala2.11",
|
||||
"node_type_id": "Standard_DS3_v2",
|
||||
"num_workers": 1
|
||||
},
|
||||
"libraries": [
|
||||
{
|
||||
"jar": "dbfs:__REPLACE_WITH_JAR_PATH__"
|
||||
}
|
||||
],
|
||||
"timeout_seconds": 3600,
|
||||
"max_retries": 1,
|
||||
"spark_jar_task": {
|
||||
"main_class_name": "com.microsoft.spark.example.SparkSimpleApp"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.microsoft.spark.example</groupId>
|
||||
<artifactId>SparkSimpleApp</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>A simple Scala app that counts rows in a file (arg1) and then saves that count to a path (arg2).</description>
|
||||
<inceptionYear>2018</inceptionYear>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>My License</name>
|
||||
<url>http://....</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
<repositories>
|
||||
<!-- REPLACE WITH YOUR ARTIFACT CODE -->
|
||||
</repositories>
|
||||
|
||||
<distributionManagement>
|
||||
<!-- REPLACE WITH YOUR ARTIFACT CODE -->
|
||||
</distributionManagement>
|
||||
<properties>
|
||||
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
<encoding>UTF-8</encoding>
|
||||
<scala.version>2.11.8</scala.version>
|
||||
<scala.compat.version>2.11</scala.compat.version>
|
||||
<scala.binary.version>2.11</scala.binary.version>
|
||||
<spark.version>2.3.0</spark.version>
|
||||
<spec2.version>4.2.0</spec2.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.compat.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Test -->
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.compat.version}</artifactId>
|
||||
<version>3.0.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.holdenkarau</groupId>
|
||||
<artifactId>spark-testing-base_2.11</artifactId>
|
||||
<version>${spark.version}_0.11.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<sourceDirectory>src/main/scala</sourceDirectory>
|
||||
<testSourceDirectory>src/test/scala</testSourceDirectory>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<!-- see http://davidb.github.com/scala-maven-plugin -->
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>3.3.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<args>
|
||||
<arg>-dependencyfile</arg>
|
||||
<arg>${project.build.directory}/.scala_dependencies</arg>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.21.0</version>
|
||||
<configuration>
|
||||
<!-- Tests will be run with scalatest-maven-plugin instead -->
|
||||
<skipTests>true</skipTests>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest-maven-plugin</artifactId>
|
||||
<version>2.0.0</version>
|
||||
<configuration>
|
||||
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
|
||||
<junitxml>.</junitxml>
|
||||
<filereports>TestSuiteReport.txt</filereports>
|
||||
<!-- Comma separated list of JUnit test class names to execute -->
|
||||
<suites>tests.AppTest</suites>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>test</id>
|
||||
<goals>
|
||||
<goal>test</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,29 @@
|
|||
package com.microsoft.spark.example
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
import org.apache.spark.sql._
|
||||
|
||||
object SparkSimpleApp {
|
||||
|
||||
def rdd_csv_col_count(inputRDD: RDD[String]): RDD[Integer] = {
|
||||
return inputRDD.map(s => s.split(",").size)
|
||||
}
|
||||
|
||||
def main (arg: Array[String]): Unit = {
|
||||
val spark = SparkSession.builder().appName("SparkSimpleApp").getOrCreate()
|
||||
import spark.implicits._
|
||||
val dataInput = arg(0)
|
||||
val dataOutput = arg(1)
|
||||
|
||||
val rdd = spark.sparkContext.textFile(dataInput)
|
||||
|
||||
//find the rows which have only one digit in the 7th column in the CSV
|
||||
val rdd1 = rdd_csv_col_count(rdd)
|
||||
|
||||
rdd1.toDF().write.mode(SaveMode.Overwrite).csv(dataOutput)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package tests
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import com.holdenkarau.spark.testing.SharedSparkContext
|
||||
import com.holdenkarau.spark.testing.RDDComparisons
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
import com.microsoft.spark.example.SparkSimpleApp
|
||||
|
||||
class AppTest extends FunSuite with SharedSparkContext with RDDComparisons {
|
||||
|
||||
test("test RDDComparisons") {
|
||||
val inputRDD = sc.parallelize(Seq("a,b,c", "a", "d,e"))
|
||||
val expectedRDD:RDD[Integer] = sc.parallelize(Seq(3, 1, 2))
|
||||
val resultRDD = SparkSimpleApp.rdd_csv_col_count(inputRDD)
|
||||
|
||||
assertRDDEqualsWithOrder(expectedRDD, resultRDD)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
import argparse
|
||||
import json
|
||||
import os
|
||||
from pathlib import posixpath
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
def parse_jobs_list(cli_output):
|
||||
"""
|
||||
Parse Databricks CLI output of `databricks jobs list` to return
|
||||
a list of job ids and their names.
|
||||
"""
|
||||
jobs = cli_output.decode('utf-8').replace('\r\n','\n').split('\n')
|
||||
output = {}
|
||||
for job in jobs:
|
||||
matches = re.search('(\d+) +(.+)', job)
|
||||
if matches:
|
||||
output[matches.group(1)] = matches.group(2)
|
||||
return output
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
"""
|
||||
Creates a Spark application deployment by wrapping the Databricks CLI
|
||||
and modifying the related job json file.
|
||||
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Deploy a set of jar or egg files as a Spark application"
|
||||
)
|
||||
parser.add_argument('objective',
|
||||
default="jar",
|
||||
choices=["jar","egg"],
|
||||
help="Valid options are jar or egg")
|
||||
parser.add_argument('library_path',
|
||||
help="The library or folder containing libraries to include")
|
||||
parser.add_argument('cloud_path',
|
||||
help="The path in the cloud (e.g. DBFS, WASB) that the library is located")
|
||||
parser.add_argument('job_json',
|
||||
help="The path to the job definition (only applicable to Databricks)")
|
||||
parser.add_argument('--python-file',
|
||||
help="The python file that runs the python application")
|
||||
parser.add_argument('--main-class',
|
||||
help="The main class of your scala jar application")
|
||||
parser.add_argument('--parameters',
|
||||
nargs='*',
|
||||
default = [],
|
||||
help="List of parameters that get passed directly to the spark jar / python task"
|
||||
)
|
||||
parser.add_argument('--profile',
|
||||
default=None,
|
||||
help="Profile name to be passed to the databricks CLI"
|
||||
)
|
||||
parser.add_argument('--update-if-exists',
|
||||
nargs=2,
|
||||
default=None,
|
||||
help="Looks for a job_id or name (useful only for Databricks deployments)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.job_json, 'r') as jobfp:
|
||||
job_def = json.load(jobfp)
|
||||
|
||||
# Is it one or many objects to install as libraries?
|
||||
if os.path.isdir(args.library_path):
|
||||
# Directory path specified, grab all files of type args.objective
|
||||
# TODO: Decide if this should be recursive or not?
|
||||
all_packages = [
|
||||
p for p in os.listdir(args.library_path)
|
||||
if os.path.splitext(p)[1] == '.' + args.objective
|
||||
]
|
||||
else:
|
||||
all_packages = [args.library_path]
|
||||
|
||||
# Get the Jar's name and it's destination folder
|
||||
# Replace the job.json's content
|
||||
job_def["libraries"] = [
|
||||
{args.objective: posixpath.join(args.cloud_path, package)} for package in all_packages
|
||||
]
|
||||
|
||||
# If it's an egg, we use spark_python_task, otherwise it's spark_jar_task
|
||||
objective_task_name = "spark_python_task" if args.objective == "egg" else "spark_jar_task"
|
||||
if args.objective == "egg":
|
||||
# You need a python_file to run the app
|
||||
job_def[objective_task_name] = {
|
||||
"python_file": args.python_file
|
||||
}
|
||||
else:
|
||||
# You need a main_class_name to run the app
|
||||
job_def[objective_task_name] = {
|
||||
"main_class_name": args.main_class
|
||||
}
|
||||
|
||||
# Parameters is an attribute across egg and jar tasks
|
||||
if args.parameters:
|
||||
job_def[objective_task_name].update(
|
||||
{"parameters":args.parameters}
|
||||
)
|
||||
|
||||
# Look to see if the job exists already (title or jar or id)
|
||||
JOB_EXISTS = None
|
||||
CLI_VERB = 'create'
|
||||
if args.update_if_exists:
|
||||
cli_output = subprocess.run(['databricks', 'jobs', 'list'], stdout=subprocess.PIPE).stdout
|
||||
jobs_on_databricks = parse_jobs_list(cli_output)
|
||||
|
||||
if args.update_if_exists[0] == "job_id":
|
||||
if args.update_if_exists[1] in jobs_on_databricks.keys():
|
||||
JOB_EXISTS = args.update_if_exists[1]
|
||||
elif args.update_if_exists[0] == "name":
|
||||
if args.update_if_exists[1] in jobs_on_databricks.values():
|
||||
candidate_jobs = list(filter(
|
||||
lambda tup: tup[1] == args.update_if_exists[1],
|
||||
jobs_on_databricks.items()
|
||||
))
|
||||
JOB_EXISTS = candidate_jobs[0][0]
|
||||
|
||||
if JOB_EXISTS:
|
||||
print("Print job {}: {} exists. Updating specifications".format(
|
||||
JOB_EXISTS, jobs_on_databricks[JOB_EXISTS]
|
||||
))
|
||||
CLI_VERB = 'reset'
|
||||
|
||||
else:
|
||||
print('Deploying a new job')
|
||||
|
||||
# Create the job on databricks or edit existing
|
||||
deployment_command = ['databricks', 'jobs', CLI_VERB, '--json', json.dumps(job_def)]
|
||||
if CLI_VERB == 'reset':
|
||||
deployment_command.extend( ['--job-id', JOB_EXISTS])
|
||||
|
||||
if args.profile:
|
||||
deployment_command.extend( ['--profile', args.profile])
|
||||
|
||||
print('Attempting to run:\n{}'.format(' '.join(deployment_command)))
|
||||
call_results = subprocess.run(deployment_command, stdout=subprocess.PIPE).stdout
|
||||
print(call_results)
|
||||
|
||||
|
||||
|
||||
|
Двоичный файл не отображается.
После Ширина: | Высота: | Размер: 78 KiB |
Двоичный файл не отображается.
После Ширина: | Высота: | Размер: 55 KiB |
Двоичный файл не отображается.
После Ширина: | Высота: | Размер: 47 KiB |
Загрузка…
Ссылка в новой задаче