KFP paralel (#34)
This commit is contained in:
Родитель
c676b54010
Коммит
8f78d52ee5
|
@ -173,6 +173,11 @@ jobs:
|
|||
folder: code/training
|
||||
|
||||
build_kfp_pipeline:
|
||||
env:
|
||||
# DATASETS: "https://aiadvocate.blob.core.windows.net/public/tacodata.zip,https://aiadvocate.blob.core.windows.net/public/tacodata.zip"
|
||||
DATASETS: "https://aiadvocate.blob.core.windows.net/public/tacodata.zip"
|
||||
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
needs: [code_quality_checks, build_images]
|
||||
if: github.event_name == 'push' || github.event_name == 'issue_comment' && contains(github.event.comment.body, '/build-pipeline')
|
||||
|
@ -211,7 +216,7 @@ jobs:
|
|||
--kfp_host $KFP_HOST \
|
||||
--tenant ${{ secrets.tenant }} \
|
||||
--service_principal ${{ secrets.SERVICE_PRINCIPAL }} \
|
||||
--sp_secret ${{ secrets.SERVICE_PRINCIPAL_PWD }} 2>&1 >/dev/null)
|
||||
--sp_secret ${{ secrets.SERVICE_PRINCIPAL_PWD }} 2>&1 >/dev/null)
|
||||
echo "::set-env name=PUBLISHED_PIPELINE_ID::$PIPELINE_ID"
|
||||
working-directory: code
|
||||
|
||||
|
@ -231,5 +236,6 @@ jobs:
|
|||
--run_name "${{ secrets.KFP_PIPELINE_NAME }}" \
|
||||
--tenant ${{ secrets.tenant }} \
|
||||
--service_principal ${{ secrets.SERVICE_PRINCIPAL }} \
|
||||
--sp_secret ${{ secrets.SERVICE_PRINCIPAL_PWD }}
|
||||
--sp_secret ${{ secrets.SERVICE_PRINCIPAL_PWD }} \
|
||||
--datasets $DATASETS
|
||||
working-directory: code
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
# Dockerfile for kubeflowyoacr.azurecr.io/databricks-notebook-runner
|
||||
|
||||
FROM python:3.7-slim
|
||||
|
||||
# pip install
|
||||
RUN pip install databricks-cli && \
|
||||
apt-get update && apt-get install jq -y
|
||||
|
||||
COPY notebook.py /scripts/notebook.py
|
||||
COPY run_config.json /scripts/run_config.json
|
||||
COPY run_notebook.sh /scripts/run_notebook.sh
|
||||
|
||||
ENTRYPOINT [ "bash", "/scripts/run_notebook.sh"]
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"run_name": "kfp notebook execution",
|
||||
"existing_cluster_id":"{{CLUSTER_ID}}",
|
||||
"notebook_task":{"notebook_path":"/Shared/{{NOTEBOOK_NAME}}",
|
||||
"base_parameters":{{NOTEBOOK_PARAMETERS}}}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
#!/bin/bash
|
||||
|
||||
while getopts "r:p:" option;
|
||||
do
|
||||
case "$option" in
|
||||
r ) RUN_ID=${OPTARG};;
|
||||
p ) NOTEBOKK_PARAMETERS=${OPTARG};;
|
||||
esac
|
||||
done
|
||||
|
||||
echo $RUN_ID
|
||||
echo $NOTEBOKK_PARAMETERS
|
||||
|
||||
cd /scripts
|
||||
|
||||
databricks workspace import -o -l PYTHON notebook.py /Shared/$RUN_ID
|
||||
|
||||
sed -i 's/{{CLUSTER_ID}}/'$CLUSTER_ID'/g' run_config.json
|
||||
sed -i 's/{{NOTEBOOK_PARAMETERS}}/'"$NOTEBOKK_PARAMETERS"'/g' run_config.json
|
||||
sed -i 's/{{NOTEBOOK_NAME}}/'$RUN_ID'/g' run_config.json
|
||||
|
||||
|
||||
run_id=$(databricks runs submit --json-file run_config.json | jq -r '.run_id')
|
||||
databricks runs get --run-id $run_id
|
||||
|
||||
SECONDS=0
|
||||
|
||||
while [[ SECONDS -lt 600 ]]; do
|
||||
STATUS=$(databricks runs get --run-id $run_id | jq -r '.state.life_cycle_state')
|
||||
if [ $STATUS == 'TERMINATED' ]; then
|
||||
break
|
||||
fi
|
||||
echo $STATUS"..."
|
||||
sleep 2
|
||||
done
|
||||
|
||||
RESULT_STATE=$(databricks runs get --run-id $run_id | jq -r '.state.result_state')
|
||||
echo $RESULT_STATE
|
||||
if [ $RESULT_STATE == 'SUCCESS' ]; then
|
||||
exit 0
|
||||
else
|
||||
echo 'See details at '$(databricks runs get --run-id $run_id | jq -r '.run_page_url')
|
||||
exit 1
|
||||
fi
|
|
@ -1,12 +1,3 @@
|
|||
FROM python:3.7-slim
|
||||
|
||||
# pip install
|
||||
RUN pip install databricks-cli && \
|
||||
apt-get update && apt-get install jq -y
|
||||
FROM kubeflowyoacr.azurecr.io/databricks-notebook-runner
|
||||
|
||||
COPY notebook.py /scripts/notebook.py
|
||||
COPY run_config.json /scripts/run_config.json
|
||||
COPY run_notebook.sh /scripts/run_notebook.sh
|
||||
|
||||
# will be overwritten by kf pipeline
|
||||
ENTRYPOINT [ "sh", "-c", "/scripts/run_notebook.sh" ]
|
||||
|
|
|
@ -72,15 +72,14 @@ def use_databricks_secret(secret_name='databricks-secret'):
|
|||
|
||||
def tacosandburritos_train(
|
||||
resource_group,
|
||||
workspace
|
||||
workspace,
|
||||
dataset
|
||||
):
|
||||
"""Pipeline steps"""
|
||||
|
||||
persistent_volume_path = '/mnt/azure'
|
||||
data_download = 'https://aiadvocate.blob.core.windows.net/public/tacodata.zip' # noqa: E501
|
||||
epochs = 2
|
||||
data_download = dataset # noqa: E501
|
||||
batch = 32
|
||||
learning_rate = 0.0001
|
||||
model_name = 'tacosandburritos'
|
||||
operations = {}
|
||||
image_size = 160
|
||||
|
@ -108,13 +107,12 @@ def tacosandburritos_train(
|
|||
command=['curl'],
|
||||
args=['-d',
|
||||
get_callback_payload(TRAIN_START_EVENT), callback_url]) # noqa: E501
|
||||
operations['run_on_databricks'] = dsl.ContainerOp(
|
||||
name='run_on_databricks',
|
||||
|
||||
operations['data processing on databricks'] = dsl.ContainerOp(
|
||||
name='data processing on databricks',
|
||||
init_containers=[start_callback],
|
||||
image=image_repo_name + '/databricks-notebook:latest',
|
||||
command=['bash'],
|
||||
arguments=[
|
||||
'/scripts/run_notebook.sh',
|
||||
'-r', dsl.RUN_ID_PLACEHOLDER,
|
||||
'-p', '{"argument_one":"param one","argument_two":"param two"}'
|
||||
]
|
||||
|
@ -122,7 +120,6 @@ def tacosandburritos_train(
|
|||
|
||||
operations['preprocess'] = dsl.ContainerOp(
|
||||
name='preprocess',
|
||||
init_containers=[start_callback],
|
||||
image=image_repo_name + '/preprocess:latest',
|
||||
command=['python'],
|
||||
arguments=[
|
||||
|
@ -134,34 +131,50 @@ def tacosandburritos_train(
|
|||
'--zipfile', data_download
|
||||
]
|
||||
)
|
||||
operations['preprocess'].after(operations['run_on_databricks'])
|
||||
|
||||
# train
|
||||
operations['training'] = dsl.ContainerOp(
|
||||
name='training',
|
||||
image=image_repo_name + '/training:latest',
|
||||
command=['python'],
|
||||
arguments=[
|
||||
'/scripts/train.py',
|
||||
'--base_path', persistent_volume_path,
|
||||
'--data', training_folder,
|
||||
'--epochs', epochs,
|
||||
'--batch', batch,
|
||||
'--image_size', image_size,
|
||||
'--lr', learning_rate,
|
||||
'--outputs', model_folder,
|
||||
'--dataset', training_dataset
|
||||
],
|
||||
output_artifact_paths={ # change output_artifact_paths to file_outputs after this PR is merged https://github.com/kubeflow/pipelines/pull/2334 # noqa: E501
|
||||
'mlpipeline-metrics': '/mlpipeline-metrics.json',
|
||||
'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'
|
||||
}
|
||||
).add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)) # noqa: E501
|
||||
operations['preprocess'].after(operations['data processing on databricks']) # noqa: E501
|
||||
|
||||
# train
|
||||
# TODO: read set of parameters from config file
|
||||
with dsl.ParallelFor([{'epochs': 1, 'lr': 0.0001}, {'epochs': 2, 'lr': 0.0002}, {'epochs': 3, 'lr': 0.0003}]) as item: # noqa: E501
|
||||
operations['training'] = dsl.ContainerOp(
|
||||
name="training",
|
||||
image=image_repo_name + '/training:latest',
|
||||
command=['python'],
|
||||
arguments=[
|
||||
'/scripts/train.py',
|
||||
'--base_path', persistent_volume_path,
|
||||
'--data', training_folder,
|
||||
'--epochs', item.epochs,
|
||||
'--batch', batch,
|
||||
'--image_size', image_size,
|
||||
'--lr', item.lr,
|
||||
'--outputs', model_folder,
|
||||
'--dataset', training_dataset
|
||||
],
|
||||
output_artifact_paths={ # change output_artifact_paths to file_outputs after this PR is merged https://github.com/kubeflow/pipelines/pull/2334 # noqa: E501
|
||||
'mlpipeline-metrics': '/mlpipeline-metrics.json',
|
||||
'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'
|
||||
}
|
||||
).add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)).add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')) # noqa: E501
|
||||
|
||||
operations['training'].after(operations['preprocess'])
|
||||
|
||||
operations['evaluate'] = dsl.ContainerOp(
|
||||
name='evaluate',
|
||||
image="busybox",
|
||||
command=['sh', '-c'],
|
||||
arguments=[
|
||||
'echo',
|
||||
'Life is Good!'
|
||||
]
|
||||
|
||||
)
|
||||
operations['evaluate'].after(operations['training'])
|
||||
|
||||
# register kubeflow artifcats model
|
||||
operations['registerkfartifacts'] = dsl.ContainerOp(
|
||||
name='registerartifacts',
|
||||
operations['register to kubeflow'] = dsl.ContainerOp(
|
||||
name='register to kubeflow',
|
||||
image=image_repo_name + '/registerartifacts:latest',
|
||||
command=['python'],
|
||||
arguments=[
|
||||
|
@ -174,11 +187,11 @@ def tacosandburritos_train(
|
|||
'--run_id', dsl.RUN_ID_PLACEHOLDER
|
||||
]
|
||||
).apply(use_azure_secret())
|
||||
operations['registerkfartifacts'].after(operations['training'])
|
||||
operations['register to kubeflow'].after(operations['evaluate'])
|
||||
|
||||
# register model
|
||||
operations['register'] = dsl.ContainerOp(
|
||||
name='register',
|
||||
operations['register to AML'] = dsl.ContainerOp(
|
||||
name='register to AML',
|
||||
image=image_repo_name + '/register:latest',
|
||||
command=['python'],
|
||||
arguments=[
|
||||
|
@ -195,10 +208,10 @@ def tacosandburritos_train(
|
|||
'--run_id', dsl.RUN_ID_PLACEHOLDER
|
||||
]
|
||||
).apply(use_azure_secret())
|
||||
operations['register'].after(operations['registerkfartifacts'])
|
||||
operations['register to AML'].after(operations['register to kubeflow'])
|
||||
|
||||
# register model to mlflow
|
||||
operations['register_to_mlflow'] = dsl.ContainerOp(
|
||||
operations['register to mlflow'] = dsl.ContainerOp(
|
||||
name='register to mlflow',
|
||||
image=image_repo_name + '/register-mlflow:latest',
|
||||
command=['python'],
|
||||
|
@ -210,7 +223,7 @@ def tacosandburritos_train(
|
|||
'--run_id', dsl.RUN_ID_PLACEHOLDER
|
||||
]
|
||||
).apply(use_azure_secret()).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)) # noqa: E501
|
||||
operations['register_to_mlflow'].after(operations['register'])
|
||||
operations['register to mlflow'].after(operations['register to AML'])
|
||||
|
||||
operations['finalize'] = dsl.ContainerOp(
|
||||
name='Finalize',
|
||||
|
@ -221,7 +234,7 @@ def tacosandburritos_train(
|
|||
callback_url
|
||||
]
|
||||
)
|
||||
operations['finalize'].after(operations['register_to_mlflow'])
|
||||
operations['finalize'].after(operations['register to mlflow'])
|
||||
|
||||
# operations['deploy'] = dsl.ContainerOp(
|
||||
# name='deploy',
|
||||
|
@ -249,7 +262,7 @@ def tacosandburritos_train(
|
|||
k8s_client.V1Volume(
|
||||
name='azure',
|
||||
persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource( # noqa: E501
|
||||
claim_name='azure-managed-disk')
|
||||
claim_name='azure-managed-file')
|
||||
)
|
||||
).add_volume_mount(k8s_client.V1VolumeMount(
|
||||
mount_path='/mnt/azure', name='azure'))
|
||||
|
|
|
@ -71,19 +71,29 @@ def main():
|
|||
help="Service Principal Secret"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--datasets",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Datasets"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
token = get_access_token(args.tenant, args.service_principal, args.sp_secret) # noqa: E501
|
||||
client = kfp.Client(host=args.kfp_host, existing_token=token)
|
||||
token = get_access_token(args.tenant, args.service_principal, args.sp_secret) # noqa: E501
|
||||
exp = client.get_experiment(experiment_name=args.experiment_name) # noqa: E501
|
||||
|
||||
pipeline_params = {}
|
||||
pipeline_params["resource_group"] = args.resource_group
|
||||
pipeline_params["workspace"] = args.workspace
|
||||
token = get_access_token(args.tenant, args.service_principal, args.sp_secret) # noqa: E501
|
||||
exp = client.get_experiment(experiment_name=args.experiment_name) # noqa: E501
|
||||
client.run_pipeline(exp.id,
|
||||
job_name=args.run_name,
|
||||
params=pipeline_params,
|
||||
pipeline_id=args.pipeline_id)
|
||||
datasets = args.datasets.split(',')
|
||||
for dataset in datasets:
|
||||
pipeline_params["dataset"] = dataset
|
||||
client.run_pipeline(exp.id,
|
||||
job_name=args.run_name,
|
||||
params=pipeline_params,
|
||||
pipeline_id=args.pipeline_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: azure-managed-disk
|
||||
name: azure-managed-file
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
storageClassName: managed-premium
|
||||
- ReadWriteMany
|
||||
storageClassName: azurefile
|
||||
resources:
|
||||
requests:
|
||||
storage: 300Gi
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: mlflow
|
||||
labels:
|
||||
app.kubernetes.io/name: mlflow
|
||||
app.kubernetes.io/instance: mlflow
|
||||
app.kubernetes.io/version: "1.8.0"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
spec:
|
||||
type: ClusterIP
|
||||
ports:
|
||||
- port: 5000
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
app.kubernetes.io/name: mlflow
|
||||
app.kubernetes.io/instance: mlflow
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: mlflow
|
||||
labels:
|
||||
app.kubernetes.io/name: mlflow
|
||||
app.kubernetes.io/instance: mlflow
|
||||
app.kubernetes.io/version: "1.8.0"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: mlflow
|
||||
app.kubernetes.io/instance: mlflow
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: mlflow
|
||||
app.kubernetes.io/instance: mlflow
|
||||
spec:
|
||||
containers:
|
||||
- name: mlflow
|
||||
image: "dtzar/mlflow:latest"
|
||||
imagePullPolicy: IfNotPresent
|
||||
args:
|
||||
- --host=0.0.0.0
|
||||
- --port=80
|
||||
- --backend-store-uri=sqlite:///mlflow.db
|
||||
- --default-artifact-root=/mnt/azure
|
||||
- --expose-prometheus=yes
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 80
|
||||
protocol: TCP
|
||||
volumeMounts:
|
||||
- mountPath: "/mnt/azure"
|
||||
name: artifacts
|
||||
volumes:
|
||||
- name: artifacts
|
||||
persistentVolumeClaim:
|
||||
claimName: azure-managed-file
|
||||
|
Загрузка…
Ссылка в новой задаче