Add exit handler for AzureDevOps (#56)

This commit is contained in:
Sushant Divate 2020-06-26 11:27:26 -07:00 коммит произвёл GitHub
Родитель 0ace098245
Коммит 83de056570
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 265 добавлений и 19 удалений

Просмотреть файл

@ -49,6 +49,15 @@ stages:
# Disabling building all other images, uncomment to include specific component from code\pipelineazdo.py
# - task: Docker@2
# displayName: Build and Push AzdoCallback Image
# inputs:
# containerRegistry: 'kubeflow-azdo-sample-acr-sc'
# repository: 'mexicanfood/azdocallback'
# command: 'buildAndPush'
# Dockerfile: 'code/azdocallback/Dockerfile'
# buildContext: 'code/azdocallback/'
# tags: 'latest'
# - task: Docker@2
# displayName: Build and Push Preprocess Image
# inputs:
# containerRegistry: 'kubeflow-azdo-sample-acr-sc'

Просмотреть файл

@ -0,0 +1,11 @@
FROM python:3.7-slim
# pip install
COPY requirements.txt /scripts/requirements.txt
RUN pip install -r /scripts/requirements.txt
# only for local testing
COPY azdocallback.py /scripts/azdocallback.py
# will be overwritten by kf pipeline
ENTRYPOINT [ "python", "/scripts/azdocallback.py" ]

Просмотреть файл

@ -0,0 +1,72 @@
import json
import requests
import time
import argparse
import kfp
import adal
def info(msg, char="#", width=75):
print("")
print(char * width)
print(char + " %0*s" % ((-1 * width) + 5, msg) + char)
print(char * width)
def send_complete_event(callbackinfo, status): # noqa: E501
callback_vars = json.loads(callbackinfo.replace("'", '"')) # noqa: E501
url = r"{planUri}/{projectId}/_apis/distributedtask/hubs/{hubName}/plans/{planId}/events?api-version=2.0-preview.1".format( # noqa: E501
planUri=callback_vars["PlanUri"], projectId=callback_vars["ProjectId"], hubName=callback_vars["HubName"], planId=callback_vars["PlanId"]) # noqa: E501
data = {'name': 'TaskCompleted',
'taskId': callback_vars["TaskInstanceId"], 'jobId': callback_vars["JobId"], 'result': status} # noqa: E501
header = {'Authorization': 'Bearer ' + callback_vars["AuthToken"]}
response = requests.post(url, json=data, headers=header)
print(response)
def get_compoenet_status(kfp_host_url, kfp_run_id, token=None):
status = "Suceeded"
client = kfp.Client(host=kfp_host_url,
existing_token=token)
run_response = client.get_run(kfp_run_id)
workflow_manifest = json.loads(
run_response.pipeline_runtime.workflow_manifest) # noqa: E501
time.sleep(5) # Current status from Argo to Kubeflow takes time
for (k, v) in workflow_manifest['status']['nodes'].items():
if(v['type'] == "Pod"):
if(v['phase'] == "Failed"):
status = "failed"
info("Failed Component: " + v['displayName'])
return status
def get_access_token(tenant, clientId, client_secret):
authorityHostUrl = "https://login.microsoftonline.com"
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
authority_url = authorityHostUrl + '/' + tenant
context = adal.AuthenticationContext(authority_url)
token = context.acquire_token_with_client_credentials(GRAPH_RESOURCE, clientId, client_secret) # noqa: E501
return token['accessToken']
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Azure DevOps Callback')
parser.add_argument('-h', '--kfp_host_url',
help='Kubeflow Host Url')
parser.add_argument('-azcb', '--azdocallback',
help='Azure DevOps call back Info')
parser.add_argument('-id', '--run_id',
help='Kubeflow Pipeline Run Id')
parser.add_argument('-t', '--tenant_id', help='tenant_id')
parser.add_argument('-s', '--service_principal_id',
help='service_principal_id')
parser.add_argument('-p', '--service_principal_password',
help='service_principal_password')
args = parser.parse_args()
status = get_compoenet_status(args.kfp_host_url, args.run_id, get_access_token( # noqa: E501
args.tenant_id, args.service_principal_id, args.service_principal_password)) # noqa: E501
send_complete_event(args.azdocallback, status)

Просмотреть файл

@ -0,0 +1,3 @@
requests
kfp
adal

Просмотреть файл

@ -0,0 +1,20 @@
FROM python:3.7-slim
# ARG host
# ARG token
# ENV DATABRICKS_HOST=host
# ENV DATABRICKS_TOKEN=token
RUN pip install mlflow>=1.0 && \
pip install databricks-cli && \
apt-get update && \
apt-get install git-all -y
COPY . /scripts/
WORKDIR /scripts
#ENTRYPOINT ["./run.sh"]
CMD [ "python3", "./run.py" ]

Просмотреть файл

@ -0,0 +1,20 @@
name: mlflowproject
conda_env: conda.yml
entry_points:
preprocess:
parameters:
training_data: {type: string, default: "./path/to/data/"}
command: "python preprocess.py {training_data}"
train:
parameters:
training_data: {type: string, default: "./path/to/data/"}
command: "python train.py {training_data}"
main:
parameters:
training_data: {type: string, default: "./path/to/data/"}
command: "python main.py {training_data}"

Просмотреть файл

@ -0,0 +1,5 @@
{
"spark_version": "6.5.x-cpu-ml-scala2.11",
"node_type_id": "Standard_DS3_v2",
"num_workers": 4
}

Просмотреть файл

@ -0,0 +1,15 @@
name: multistepmlflow
channels:
- defaults
- anaconda
- conda-forge
dependencies:
- python=3.6
- pyspark
- requests
- click
- pip
- pip:
- tensorflow==1.15.2
- keras==2.2.4
- mlflow>=1.0

Просмотреть файл

@ -0,0 +1,18 @@
import click
import mlflow
@click.command()
@click.argument("training_data")
def workflow(training_data):
with mlflow.start_run() as active_run: # noqa: F841, E501
preprocess_run = mlflow.run(".", "preprocess", parameters={ # noqa: F841, E501
"training_data": training_data})
train_model_run = mlflow.run(".", "train", parameters={ # noqa: F841, E501
"training_data": training_data})
if __name__ == '__main__':
workflow()

Просмотреть файл

@ -0,0 +1,11 @@
import click
@click.command()
@click.argument("training_data")
def preprocess(training_data):
print(training_data)
if __name__ == '__main__':
preprocess()

35
code/mlfow-project/run.py Normal file
Просмотреть файл

@ -0,0 +1,35 @@
import mlflow
import os
import click
@click.command()
@click.option("--experiement_id", type=click.INT)
@click.option("--kf_run_id", type=click.STRING)
def run(experiement_id, kf_run_id):
mlflow.set_tracking_uri("databricks")
submitted_run = mlflow.run(".",
entry_point="main",
experiment_name=None,
experiment_id=experiement_id,
parameters=None,
backend='databricks',
backend_config='clusterconfig.json',
)
mlflowClient = mlflow.tracking.MlflowClient().get_run(submitted_run.run_id)
if (mlflowClient.info.status != "FINISHED"):
raise Exception("MLflow Experiment failed")
print("Experiment Completed")
print("Status: " + mlflowClient.info.status)
print("MLFLOW Run ID: " + mlflowClient.info.run_id)
print("MLFLOW Artifact URI" + mlflowClient.info.artifact_uri)
print("KubeFlow Run ID" + kf_run_id)
if __name__ == '__main__':
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
print(os.path.abspath(os.getcwd()))
run()

18
code/mlfow-project/run.sh Normal file
Просмотреть файл

@ -0,0 +1,18 @@
#!/bin/bash
export DATABRICKS_HOST='YOUR Domain '
export DATABRICKS_TOKEN='SECRET'
echo $DATABRICKS_HOST
echo $DATABRICKS_TOKEN
echo 'Login to Databricks CLI'
# databricks configure --token << ANSWERS
# $DATABRICKS_HOST
# $DATABRICKS_TOKEN
# ANSWERS
databricks workspace
export MLFLOW_TRACKING_URI=databricks
mlflow run . -e main --experiment-id 1315550910283717 -b databricks --backend-config clusterconfig.json

Просмотреть файл

@ -0,0 +1,11 @@
import click
@click.command()
@click.argument("training_data")
def train(training_data):
print(training_data)
if __name__ == '__main__':
train()

Просмотреть файл

@ -54,23 +54,6 @@ def use_databricks_secret(secret_name='databricks-secret'):
return _use_databricks_secret
def azdocallback(callbackinfo):
if callbackinfo is not None:
import sys
import subprocess
subprocess.run([sys.executable, '-m', 'pip', 'install', 'requests'])
import requests # noqa: F811,E501
import json # noqa: F811,E501
callback_vars = json.loads(callbackinfo.replace("'", '"')) # noqa: E501
url = r"{planUri}/{projectId}/_apis/distributedtask/hubs/{hubName}/plans/{planId}/events?api-version=2.0-preview.1".format( # noqa: E501
planUri=callback_vars["PlanUri"], projectId=callback_vars["ProjectId"], hubName=callback_vars["HubName"], planId=callback_vars["PlanId"]) # noqa: E501
data = {'name': 'TaskCompleted',
'taskId': callback_vars["TaskInstanceId"], 'jobId': callback_vars["JobId"], 'result': 'succeeded'} # noqa: E501
header = {'Authorization': 'Bearer ' + callback_vars["AuthToken"]}
response = requests.post(url, json=data, headers=header)
print(response)
def tacosandburritos_train(
resource_group,
workspace,
@ -91,9 +74,24 @@ def tacosandburritos_train(
model_folder = 'model'
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
mlflow_url = 'http://mlflow:5000'
kfp_host_url = 'http://51.143.118.153/pipeline'
exitop = comp.func_to_container_op(azdocallback)
with dsl.ExitHandler(exit_op=exitop(azdocallbackinfo)):
exit_op = dsl.ContainerOp(
name='Exit Handler',
image=image_repo_name + '/azdocallback:latest',
command=['python'],
arguments=[
'/scripts/azdocallback.py',
'--kfp_host_url', kfp_host_url,
'--azdocallback', azdocallbackinfo,
'--run_id', dsl.RUN_ID_PLACEHOLDER,
'--tenant_id', "$(AZ_TENANT_ID)",
'--service_principal_id', "$(AZ_CLIENT_ID)",
'--service_principal_password', "$(AZ_CLIENT_SECRET)",
]
).apply(use_azure_secret())
with dsl.ExitHandler(exit_op=exit_op):
operations['mlflowproject'] = dsl.ContainerOp(
name='Run MLflow Project on Azure Databricks',