This commit is contained in:
Vinnie 2020-07-07 11:26:20 -04:00
Родитель d203b7365b
Коммит d4adf891c1
11 изменённых файлов: 574 добавлений и 10 удалений

138
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,138 @@
# Certificates
*.pem
*.csr
*.crt
params.json
# IDE
.vscode/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

233
README.md
Просмотреть файл

@ -1,14 +1,227 @@
# db-dsc-jobs
This code is intended to alleviate manual effort required to manage jobs in a Databricks cluster. The functionality has been designed with specific criteria and the code provided does not currently cover all potential scenarios.
# Contributing
There are two primary functions to this application; authenticate to a Databricks workspace, and then query, delete, and submit jobs in that workspace. Both the authentication and jobs management are read in from JSON file which need to be created or modified prior to executing to application.
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
Usage:
When you submit a pull request, a CLA bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
```
python job.py --params params.json
```
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
The Databricks job definition is based on templates from the jinja2 library. The application will read in each file in the `/jobs/` folder and apply the attributes using the appropriate jinja2 template. Both files and folders need to be present for the application to function correctly.
## Application Functionality
The application performs the following operations:
1. Read in parameter file
2. Acquire authentication token
3. Query for existing jobs in the cluster
4. Generates list of target jobs to delete and resubmit based on `creator_user_name`
5. Read in all JSON job definition files from `\jobs\` directory
6. Delete only jobs which have a name matching a definition in the `\jobs` directory
7. Create jobs based on defintions in the `\jobs` directory
Jinja2 library information can be found here:
https://pypi.org/project/Jinja2/
## Environment Requirements
The following requirements need to be met in your Azure and Databricks environment before this application will work correctly:
Regardless of authentication approach, the user being authenticated with for this application needs to be added to the Databricks instance.
Documentation can be found here: https://docs.databricks.com/dev-tools/api/latest/scim/index.html
The following cURL command is an example of adding a service principal or a managed identity to a cluster.
```
curl -n -X POST \
-H 'Content-Type: application/json' \
-H 'application/scim+json' \
-H 'Authorization: Bearer <personal access token or generated Bearer token>' \
-d '{
"schemas":[
"urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"
],
"applicationId":"00000000-0000-0000-0000-000000000000",
"displayName":"test-msi-or-spn",
"groups":[
{
"value":"my-group"
}
],
"entitlements":[
{
"value":"allow-cluster-create"
},
{
"value":"can-manage"
}
]
}' \
https://adb-0000000000000000.0.azuredatabricks.net/api/2.0/preview/scim/v2/ServicePrincipals
```
To confirm that the user was added, call the SCIM API to list service principals:
```
curl -n -X GET -H 'Content-Type: application/json' \
-H 'application/scim+json' \
-H 'Authorization: Bearer <personal access token or generated Bearer token>' \
https://adb-0000000000000000.0.azuredatabricks.net/api/2.0/preview/scim/v2/ServicePrincipals
```
## Job Template Values
The JSON job templates provided in the `\jobs\` folder contains base values required for job execution along with parameter values which can be passed to the job. The parameters are defined using key:value pairs and correlate with the Jinja2 template.
### Job definition file
```
{
"name": "Jinja job example",
"workers": 1,
"notebookpath": "/Covid19",
"par_sourcesystem": "testSource",
"par_sourcesystem_val": "testSource_val",
"par_cdc_volume": "testcdc-volume",
"par_cdc_volume_val": "testcdc-volume_val",
"par_numberofstreams": "testnumberofstreams",
"par_numberofstreams_val": "testnumberofstreams_val",
"par_configfilepath": "testconfigfilepath",
"par_configfilepath_val": "testconfigfilepath_val",
"description": "Not used in template, for reference"
}
```
### Jinja2 template snippet
```
...
"notebook_task": {
"notebook_path": "{{ job.notebookpath}}",
"base_parameters": {
"{{ job.par_sourcesystem }}": "{{ job.par_sourcesystem_val }}",
"{{ job.par_cdc_volume }}": "{{ job.par_cdc_volume_val }}",
"{{ job.par_numberofstreams }}": "{{ job.par_numberofstreams_val }}",
"{{ job.par_configfilepath }}": "{{ job.par_configfilepath_val }}"
}
...
```
### Databricks job definition
After the job has been created, the parameters mapped can be seen in the job definition within Databricks.
![](img/JinjaJobExample.png)
## Authentication Parameter File
The parameter file will have the following values present, depending on which authentication method is used.
|Attribute |Value |
|----------|----------------|
|`authority_type`|Designates the method of authenication (`spn-cert`, `spn-key`, `msi`, `pat`)|
|`authority`|Endpoint for the authentication process |
|`client_id`|Application (Client) ID or Email/UPN|
|`resource`|Static value for the Databricks resource provider (`2ff814a6-3304-4ab8-85cb-cd0e6f879c1d`)|
|`databricks_uri`|Databricks base URI|
|`cert_thumbprint`|When using a certificate, the thumbprint value from the file|
|`private_key_file`|When using a certificate, local location of the private key file|
|`client_secret`|When using Key authentication for a service principal, this is the secret generated from AAD|
|`pat_token`|Personal Access Token generated from Databricks|
### Authority Type Details
|Authority Type | authority_type value | Client ID | client_id value |
|---------------|----------------------|-----------|---------|
|Service Principal using Certificate | `spn-cert`|Application (client) ID from AAD|`00000000-0000-0000-0000-000000000000`|
|Service Principal using Key | `spn-key` |Application (client) ID from AAD|`00000000-0000-0000-0000-000000000000`|
|Managed Service Identity | `msi` |Client ID from resource|`00000000-0000-0000-0000-000000000000`|
|Personal Access Token | `pat` |Email/UPN|`user@domain.com`|
For Service Principal authentication, allowed methods are using a certificate associated with the identity or a generated key. In order to acquire the authentication token required to communicate with Databricks REST APIs, a scope specific to the Databricks resource provider in Azure must be specified. That attribute from the parameter file is `resource`, which is then appended with `/.default` for the authenitcation token generation API call. The MSAL required library facilitates this API call.
During the testing of this application, a locally generated certificate was generated using this method: https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Client-Credentials
Managed Identity (or Managed Service Identity, MSI) is a specific resource either created indepently in Azure (user-generated) or is created and associated to a resource when it is created in Azure (system-managed). MSIs do not use the MSAL library but instead call a known endpoint from the resource it's associated to in Azure. In our testing, we created a managed identity and then associated to a VM. We are only able to successfully generate a token when calling the known endpoint from within that VM. The resouce scope is also a necessary part of that API call and is specific to Databricks.
Personal Access Tokens allow for authenicating directly against the Databricks instance, since the token is generated from the service.
### Authentication Examples
```
{
"authority_type": "spn-cert"
"authority": "https://login.microsoftonline.com/<tenant_id>"
"client_id": "<client-id>"
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d"
"databricks_uri": "https://<adb_instance>.azuredatabricks.net",
"cert_thumbprint": "<cert_thumbprint>",
"private_key_file": "<private_key_file.ext>"
}
```
```
{
"authority_type": "spn-key"
"authority": "https://login.microsoftonline.com/<tenant_id>"
"client_id": "<your-sp-client-id>"
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d"
"databricks_uri": "https://<adb_instance>.azuredatabricks.net",
"client_secret": "<client_secret>"
}
```
```
{
"authority_type": "msi"
"authority": "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01"
"client_id": "<client-id>"
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d",
"databricks_uri": "https://<adb_instance>.azuredatabricks.net"
}
```
```
{
"authority_type": "pat"
"client_id": "<client-id>"
"databricks_uri": "https://<adb_instance>.azuredatabricks.net"
"pat_token": "<pat_token>"
}
```
## Additional Resources
For testing purposes, the following cURL call be used to generate a token using values required in the parameter file. This example uses a service principal authenicating using a key:
```
curl -X GET
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=client_credentials&client_id=<client_id>0&resource=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d&client_secret=<client_secret>' \
https://login.microsoftonline.com/<tenant_id>/oauth2/token
```
## Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

27
auth.py Normal file
Просмотреть файл

@ -0,0 +1,27 @@
import sys, json, requests, msal
# Cert based authentication using a service principal against MSFT AAD common endpoint (v2)
# This is using MSAL over ADAL and uses a private key with a thumbprint loaded onto the service principal for authentication
# TODO: provide instructions in README on how to get the .pem
def get_auth_token(paramFile):
result = None
auth = paramFile["authority_type"]
if auth == "msi":
result = json.loads(requests.get(paramFile["authority"] + "&resource=" + paramFile["resource"] + "&client_id=" + paramFile["client_id"], headers={"Metadata": "true"}).text)
elif auth == "spn-cert" or auth == "spn-key":
app = msal.ConfidentialClientApplication(
paramFile["client_id"], authority=paramFile["authority"],
client_credential= {"thumbprint": paramFile["thumbprint"], "private_key": open(paramFile['private_key_file']).read()} if auth == "spn-cert" else paramFile["client_secret"]
)
result = app.acquire_token_for_client(scopes=[paramFile["resource"] + "/.default"])
elif auth == "pat":
result = {'access_token': paramFile["pat_token"]}
else:
# TODO: Raise exception
result = ""
return result

Двоичные данные
img/JinjaJobExample.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 88 KiB

90
job.py Normal file
Просмотреть файл

@ -0,0 +1,90 @@
import json, socket, argparse, sys, auth
import requests
import os
from jinja2 import Environment, PackageLoader
from datetime import datetime
current_time = datetime.now().strftime("%H:%M:%S")
# Optional: lift to env files.
parser = argparse.ArgumentParser(description='DSC job management for Databricks')
parser.add_argument('--params', type=str, help='your Databricks and Azure parameter file', default='params.json')
args = parser.parse_args()
configuration = json.load(open(args.params))
auth_token = auth.get_auth_token(configuration)
databricks_uri = configuration['databricks_uri'] + "/api/2.0/%s"
# Settings for jinja2
tplenv = Environment(loader=PackageLoader('job','templates'))
tplenv.filters['jsonify'] = json.dumps
# Settings for request/urllib3
head = {'Authorization': 'Bearer ' + auth_token["access_token"], 'Content-Type': 'application/json'}
# Get something from Databricks, parse to JSON if asked
def get_db(action, returnJson=False):
url = databricks_uri % action
log("REST - GET - Calling %s" % action)
response = requests.get(url, headers=head)
return response.json() if json else response
# Post something from Databricks, parse to JSON if asked
def post_db(action, jsonpl, returnJson=False):
url = databricks_uri % action
log("REST - POST - Calling %s" % action)
response = requests.post(url, headers=head, data=jsonpl)
return response
# Delete a job, this is a guaranteed operation by the Databricks API on successful ack.
def delete_job(id):
log("Deleting %s" % id)
tpl = tplenv.get_template('delete.jinja2').render(id=id)
result = post_db("jobs/delete", tpl)
if(result.ok):
log("Deleted job %s" % id)
else:
log("Error deleting job: %s" % result.content)
# Helper to print timestamps
def log(s):
print("[%s] %s" % (current_time, s))
def main():
log("Running execution against %s" % configuration['databricks_uri'])
current_jobs = get_db("jobs/list", returnJson=True)
current_jobnames = []
if(len(current_jobs) > 0):
log("Total of %s jobs found" % len(current_jobs['jobs']))
current_jobnames = [(j['settings']['name'],j['job_id']) for j in current_jobs['jobs'] if j['creator_user_name'] == configuration["client_id"]]
else:
log("No jobs")
# Set up definition based on input from Molly
target_jobs = [json.load(open(jobcfg)) for jobcfg in os.scandir('jobs') if(jobcfg.is_file() and jobcfg.path.endswith('.json'))]
target_jobnames = [j['name'] for j in target_jobs]
# All jobs that need to be deleted
jobs_to_delete = filter(lambda x: x[0] in target_jobnames, current_jobnames)
# Delete active jobs for the name in job1
# TODO: The above definition need to come from a folder in DBFS, then loop over them and pull.
[delete_job(item[1]) for item in jobs_to_delete]
# Create a new job with the name above
template = tplenv.get_template('standard.jinja2')
for x in target_jobs:
task = template.render(job=x)
result = post_db("jobs/create", task).json()
log("Created a new job %s" % result['job_id'])
# Module hook
if __name__ == '__main__':
main()

14
jobs/job1.json Normal file
Просмотреть файл

@ -0,0 +1,14 @@
{
"name": "Jinja job example",
"workers": 1,
"notebookpath": "/Covid19",
"par_sourcesystem": "testSource",
"par_sourcesystem_val": "testSource_val",
"par_cdc_volume": "testcdc-volume",
"par_cdc_volume_val": "testcdc-volume_val",
"par_numberofstreams": "testnumberofstreams",
"par_numberofstreams_val": "testnumberofstreams_val",
"par_configfilepath": "testconfigfilepath",
"par_configfilepath_val": "testconfigfilepath_val",
"description": "Not used in template, for reference"
}

16
params.template.json Normal file
Просмотреть файл

@ -0,0 +1,16 @@
{
"authority_type": "",
"authority": "",
"client_id": "",
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d",
"databricks_uri": "",
"cert_thumbprint": "",
"private_key_file": "",
"client_secret": "",
"pat_token": ""
}

3
requirements.txt Normal file
Просмотреть файл

@ -0,0 +1,3 @@
jinja2
requests
msal

1
templates/delete.jinja2 Normal file
Просмотреть файл

@ -0,0 +1 @@
{ "job_id": {{ id }} }

35
templates/standard.jinja2 Normal file
Просмотреть файл

@ -0,0 +1,35 @@
{
"name": "{{ job.name }}",
"new_cluster": {
"spark_version": "6.5.x-scala2.11",
"node_type_id": "Standard_DS3_v2",
"num_workers": {{ job.workers }}
},
"libraries": [
{
"jar": "dbfs:/my-jar.jar"
},
{
"maven": {
"coordinates": "org.jsoup:jsoup:1.7.2"
}
}
],
"notebook_task": {
"notebook_path": "{{ job.notebookpath}}",
"base_parameters": {
"{{ job.par_sourcesystem }}": "{{ job.par_sourcesystem_val }}",
"{{ job.par_cdc_volume }}": "{{ job.par_cdc_volume_val }}",
"{{ job.par_numberofstreams }}": "{{ job.par_numberofstreams_val }}",
"{{ job.par_configfilepath }}": "{{ job.par_configfilepath_val }}"
}
},
"timeout_seconds": 3600,
"max_retries": 1,
"schedule": {
"quartz_cron_expression": "0 15 22 ? * *",
"timezone_id": "America/New_York"
}
}

Просмотреть файл

@ -0,0 +1,27 @@
{
"name": "{{ job.name }}",
"new_cluster": {
"spark_version": "5.3.x-scala2.11",
"node_type_id": "Standard_D3_v2",
"num_workers": {{ job.workers }}
},
"libraries": [
{
"jar": "dbfs:/my-jar.jar"
},
{
"maven": {
"coordinates": "org.jsoup:jsoup:1.7.2"
}
}
],
"timeout_seconds": 3600,
"max_retries": 1,
"schedule": {
"quartz_cron_expression": "0 15 22 ? * *",
"timezone_id": "America/Los_Angeles"
},
"spark_jar_task": {
"main_class_name": "com.databricks.ComputeModels"
}
}