Adding Notebook Job Deployment and Error Handling (#5)

* Adding notebook job functionality

```
python deployment.py notebook \
na \
na \
local/path/to/job.json \
--notebook-path "/Databricks/Path/"
```

* Adding error logic handling and simplifying the job json for notebok

* Updated notebook release image
This commit is contained in:
Will Johnson 2020-03-23 11:03:48 -05:00 коммит произвёл GitHub
Родитель a225696521
Коммит 7b0e9ac5df
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 108 добавлений и 39 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -1,3 +1,5 @@
.vscode/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

10
Notebook/job.json Normal file
Просмотреть файл

@ -0,0 +1,10 @@
{
"name": "Example Job to Run a Notebook",
"new_cluster": {
"spark_version": "6.2.x-scala2.11",
"node_type_id": "Standard_DS3_v2",
"num_workers": 1
},
"timeout_seconds": 3600,
"max_retries": 1
}

Просмотреть файл

@ -175,6 +175,14 @@ Add the following tasks to both the QA and Prod stages (Pro Tip: You can do this
* Set Notebooks Folder to `$(System.DefaultWorkingDirectory)/_code/Notebook`
* Set Workspace Folder to `/Shared`
* This will recreate the entire `_code/Notebook` structure (notebooks and folders) in the `/Shared/` folder in Databricks.
1. Python Script
* Script Source: File Path
* Script Path: `$(System.DefaultWorkingDirectory)/_code/deployment.py`
* Arguments:
```
notebook na na $(System.DefaultWorkingDirectory)/_code/Notebook/job.json --notebook-path "/SparkSimpleNotebook" --profile AZDO --parameters "input" "/mnt/input/bank/bank.csv" "output" "/mnt/output/SparkSimpleAppNotebook/test"
```
To add further to this example, you might:
* Deploy your notebook to a versioned folder number based on the [Pre-defined release variable](https://docs.microsoft.com/en-us/azure/devops/pipelines/release/variables?view=azure-devops&tabs=batch#default-variables) of `Release.DeploymentID`.
@ -183,6 +191,22 @@ To add further to this example, you might:
You now have a working pipeline to deploy Azure Databricks Notebooks! Save and execute the Release!
## Additional Functionality
### Update an Existing Job by Name or Job_ID
This deployment script does NOT help with identifying existing jobs and shutting them down.
Instead, you can specify the `--update-if-exists` flag to change an existing job if it is found.
You can either pass `--update-if-exists job_id XXX` with XXX being a known job-id and it will update the job found when looking up job-id XXX.
Alternatively, you can pass in the name of the job like `--update-if-exists name my_job_name`. This will look up the job-id for the first match it finds (ordering not guaranteed) based on the name of the job. The downsides are:
* Databricks allows for multiple jobs to share the same name (why?!?)
* The output of `databricks jobs list` seems to be smaller job-id first.
* You could miss existing active jobs with schedules if they have later job-id's and the same name.
# deployment.py
The deployment.py file helps abstract the calls to the Databricks CLI and enables you to replace text in the job's json definition.
@ -191,27 +215,34 @@ The help file below describes the usage.
```
usage: deployment.py [-h] [--python-file PYTHON_FILE]
[--main-class MAIN_CLASS] [--profile PROFILE]
[--main-class MAIN_CLASS] [--notebook-path NOTEBOOK_PATH]
[--profile PROFILE]
[--update-if-exists UPDATE_IF_EXISTS UPDATE_IF_EXISTS]
[--parameters ...]
{jar,egg} library_path cloud_path job_json
{jar,egg,notebook} library_path cloud_path job_json
Deploy a set of jar or egg files as a Spark application
positional arguments:
{jar,egg} Valid options are jar or egg
library_path The library or folder containing libraries to include
{jar,egg,notebook} Valid options are jar, egg, or notebook
library_path The library or folder containing libraries to include.
Use na for no libraries.
cloud_path The path in the cloud (e.g. DBFS, WASB) that the
library is located
library is located. Use na for no libraries.
job_json The path to the job definition (only applicable to
Databricks)
optional arguments:
-h, --help show this help message and exit
--python-file PYTHON_FILE
The python file that runs the python application
(egg option) The python file that runs the python
application
--main-class MAIN_CLASS
The main class of your scala jar application
(jar option) The main class of your scala jar
application
--notebook-path NOTEBOOK_PATH
(notebook option)The path to your notebook in your
databricks workspace
--profile PROFILE Profile name to be passed to the databricks CLI
--update-if-exists UPDATE_IF_EXISTS UPDATE_IF_EXISTS
Looks for a job_id or name (useful only for Databricks

Просмотреть файл

@ -30,18 +30,20 @@ if __name__ == "__main__":
)
parser.add_argument('objective',
default="jar",
choices=["jar","egg"],
help="Valid options are jar or egg")
choices=["jar","egg", "notebook"],
help="Valid options are jar, egg, or notebook")
parser.add_argument('library_path',
help="The library or folder containing libraries to include")
help="The library or folder containing libraries to include. Use na for no libraries.")
parser.add_argument('cloud_path',
help="The path in the cloud (e.g. DBFS, WASB) that the library is located")
help="The path in the cloud (e.g. DBFS, WASB) that the library is located. Use na for no libraries.")
parser.add_argument('job_json',
help="The path to the job definition (only applicable to Databricks)")
parser.add_argument('--python-file',
help="The python file that runs the python application")
help="(egg option) The python file that runs the python application")
parser.add_argument('--main-class',
help="The main class of your scala jar application")
help="(jar option) The main class of your scala jar application")
parser.add_argument('--notebook-path',
help="(notebook option)The path to your notebook in your databricks workspace")
parser.add_argument('--profile',
default=None,
help="Profile name to be passed to the databricks CLI"
@ -58,41 +60,67 @@ if __name__ == "__main__":
)
args = parser.parse_args()
map_objective_to_task = {
"jar": "spark_jar_task",
"egg": "spark_python_task",
"notebook": "notebook_task"
}
with open(args.job_json, 'r') as jobfp:
job_def = json.load(jobfp)
# If the library path attribute is na then skip adding libraries
# Is it one or many objects to install as libraries?
if os.path.isdir(args.library_path):
# Directory path specified, grab all files of type args.objective
# TODO: Decide if this should be recursive or not?
all_packages = [
p for p in os.listdir(args.library_path)
if os.path.splitext(p)[1] == '.' + args.objective
if args.library_path.strip().lower() != "na":
if os.path.isdir(args.library_path):
# Directory path specified, grab all files of type args.objective
# TODO: Decide if this should be recursive or not?
all_packages = [
p for p in os.listdir(args.library_path)
if os.path.splitext(p)[1] == '.' + args.objective
]
else:
all_packages = [args.library_path]
# Get the library's name and it's destination folder
# Replace the job.json's content
job_def["libraries"] = [
{args.objective: posixpath.join(args.cloud_path, package)} for package in all_packages
]
else:
all_packages = [args.library_path]
# Get the Jar's name and it's destination folder
# Replace the job.json's content
job_def["libraries"] = [
{args.objective: posixpath.join(args.cloud_path, package)} for package in all_packages
]
# If it's an egg, we use spark_python_task, otherwise it's spark_jar_task
objective_task_name = "spark_python_task" if args.objective == "egg" else "spark_jar_task"
if args.objective == "egg":
# Get the task type based on the passed in objective
objective_task_name = map_objective_to_task[args.objective]
if objective_task_name == "spark_python_task":
# You need a python_file to run the app
job_def[objective_task_name] = {
"python_file": args.python_file
}
else:
elif objective_task_name == "spark_jar_task":
# You need a main_class_name to run the app
job_def[objective_task_name] = {
"main_class_name": args.main_class
}
else:
# Assuming notebook task
job_def[objective_task_name] = {
"notebook_path": args.notebook_path
}
# Adding in parameters if they are available
if args.parameters:
# Assuming --parameters key1 value1 key2 value2
# If it's not an even set of pairs
if len(args.parameters) % 2 != 0:
raise IndexError("Parameters passed into a notebook task must be an even number of attributes as it assumes key, value pairs")
pair_indexes = [x for x in range(len(args.parameters)) if x %2 == 0]
job_def[objective_task_name].update(
{"base_parameters": {args.parameters[x]:args.parameters[x+1] for x in pair_indexes }}
)
# Back to the main flow
# Parameters is an attribute across egg and jar tasks
if args.parameters:
if args.parameters and objective_task_name != "notebook_task":
job_def[objective_task_name].update(
{"parameters":args.parameters}
)
@ -170,10 +198,8 @@ if __name__ == "__main__":
print("Succeeded in performing {} with job_id {}".format(
CLI_VERB, resulting_job_id
))
# This prints a specific format for Azure DevOps to pick up output as a variable
print("##vso[task.setvariable variable=DBR_JOB_ID]{}".format(resulting_job_id))
if call_results_json.decode('utf-8').lower().startswith("error"):
raise Exception("The Databricks Job deployment failed with error: {}".format(call_results_json.decode('utf-8')))
else:
# This prints a specific format for Azure DevOps to pick up output as a variable
print("##vso[task.setvariable variable=DBR_JOB_ID]{}".format(resulting_job_id))

Двоичные данные
docs/img/dbr-py-notebook-release.PNG

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 48 KiB

После

Ширина:  |  Высота:  |  Размер: 60 KiB