This commit is contained in:
Eugene Fedorenko 2020-07-21 13:55:25 -07:00 коммит произвёл GitHub
Родитель d81156416e
Коммит 3a5a5330a1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 464 добавлений и 418 удалений

Просмотреть файл

@ -0,0 +1,21 @@
name: Exit Handler
inputs:
- {name: kfp_host_url, type: String}
- {name: azdocallbackinfo, type: String}
- {name: run_id, type: String}
- {name: tenant_id, type: Integer}
- {name: service_principal_id, type: String}
- {name: service_principal_password, type: String}
implementation:
container:
image: ''
command: [
"python", "/scripts/azdocallback.py",
'--kfp_host_url', {inputValue: kfp_host_url},
'--azdocallback', {inputValue: azdocallbackinfo},
'--run_id', {inputValue: run_id},
'--tenant_id', {inputValue: tenant_id},
'--service_principal_id', {inputValue: service_principal_id},
'--service_principal_password', {inputValue: service_principal_password}
]

Просмотреть файл

@ -0,0 +1,13 @@
name: Processing on Databricks
inputs:
- {name: run_id, type: String}
- {name: notebook_params, type: String}
implementation:
container:
image: 'kubeflowyoacr.azurecr.io/mexicanfood/databricks-notebook:latest'
command: [
"bash", "/scripts/run_notebook.sh",
'--r', {inputValue: run_id},
'--p', {inputValue: notebook_params}
]

Просмотреть файл

@ -0,0 +1,13 @@
name: Evaluate Model
inputs:
- {name: model, type: kubeflow.org/alpha/model}
outputs:
- {name: result, type: String}
implementation:
container:
image: 'busybox'
command: [
"sh", "-c", 'echo $1 && mkdir -p /tmp/outputs/result && sleep 5s && echo $0 > $1', {inputPath: model}, {outputPath: result}
]

Просмотреть файл

@ -0,0 +1,13 @@
name: Exit Handler
inputs:
- {name: callback_url, type: String}
- {name: callback_payload, type: String}
implementation:
container:
image: 'curlimages/curl'
command: [
"curl", "-d", {inputValue: callback_payload}, {inputValue: callback_url}
]

Просмотреть файл

@ -0,0 +1,13 @@
name: Finalize
inputs:
- {name: callback_url, type: String}
- {name: callback_payload, type: String}
implementation:
container:
image: 'curlimages/curl'
command: [
"curl", "-d", {inputValue: callback_payload}, {inputValue: callback_url}
]

Просмотреть файл

@ -0,0 +1,13 @@
name: Run MLflow Project on Azure Databricks
inputs:
- {name: mlflow_experiment_id, type: String}
- {name: kf_run_id, type: String}
implementation:
container:
image: ''
command: [
"python", "/scripts/run.py",
'--experiement_id', {inputValue: mlflow_experiment_id},
'--kf_run_id', {inputValue: kf_run_id}
]

Просмотреть файл

@ -1,20 +1,19 @@
# flake8: noqa E501
"""Main pipeline file"""
from kubernetes import client as k8s_client
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as components
from kfp.azure import use_azure_secret
import json
import os
from kubernetes.client.models import V1EnvVar
from utils.kfp_helper import use_databricks_secret, use_image
TRAIN_START_EVENT = "Training Started"
TRAIN_FINISH_EVENT = "Training Finished"
@dsl.pipeline(
name='Tacos vs. Burritos',
description='Simple TF CNN'
)
def get_callback_payload(event_type):
payload = {}
payload['event_type'] = event_type
@ -26,235 +25,126 @@ def get_callback_payload(event_type):
return json.dumps(payload)
# TODO: refactor this. Looks ugly
def use_databricks_secret(secret_name='databricks-secret'):
def _use_databricks_secret(task):
from kubernetes import client as k8s_client
(
task.container
.add_env_variable(
k8s_client.V1EnvVar(
name='DATABRICKS_HOST',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_HOST'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='DATABRICKS_TOKEN',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_TOKEN'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='CLUSTER_ID',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='CLUSTER_ID'
)
)
)
)
)
return task
return _use_databricks_secret
def get_start_callback_container():
return dsl.UserContainer('callback',
'curlimages/curl',
command=['curl'],
args=['-d',
get_callback_payload(TRAIN_START_EVENT), callback_url]) # noqa: E501
persistent_volume_path = '/mnt/azure'
batch = 32
model_name = 'tacosandburritos'
operations = {}
image_size = 160
training_folder = 'train'
training_dataset = 'train.txt'
model_folder = 'model'
callback_url = 'kubemlopsbot-svc.kubeflow.svc.cluster.local:8080'
mlflow_url = 'http://mlflow:5000'
component_root = os.path.join(os.path.dirname(
os.path.abspath(__file__)), ".")
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
databricks_op = components.load_component_from_file(os.path.join(component_root, 'databricks/component.yaml')) # noqa: E501
databricks_image_name = image_repo_name + '/databricks-notebook:%s' % (os.getenv('DATABRICKS_TAG') or 'latest') # noqa: E501
preprocess_op = components.load_component_from_file(os.path.join(component_root, 'preprocess/component.yaml')) # noqa: E501
preprocess_image_name = image_repo_name + '/preprocess:%s' % (os.getenv('PREPROCESS_TAG') or 'latest') # noqa: E501
train_op = components.load_component_from_file(os.path.join(component_root, 'training/component.yaml')) # noqa: E501
train_image_name = image_repo_name + '/training:%s' % (os.getenv('TRAINING_TAG') or 'latest') # noqa: E501
evaluate_op = components.load_component_from_file(os.path.join(component_root, 'evaluate/component.yaml')) # noqa: E501
register_op = components.load_component_from_file(os.path.join(component_root, 'register/component.yaml')) # noqa: E501
register_images_name = image_repo_name + '/register:%s' % (os.getenv('REGISTER_TAG') or 'latest') # noqa: E501
register_mlflow_op = components.load_component_from_file(os.path.join(component_root, 'register-mlflow/component.yaml')) # noqa: E501
register_mlflow_image_name = image_repo_name + '/register-mlflow:%s' % (os.getenv('REGISTERMLFLOW_TAG') or 'latest') # noqa: E501
finalize_op = components.load_component_from_file(os.path.join(component_root, 'finalize/component.yaml')) # noqa: E501
exit_op = components.load_component_from_file(os.path.join(component_root, 'exit-handler/component.yaml')) # noqa: E501
@dsl.pipeline(
name='Tacos vs. Burritos',
description='Simple TF CNN'
)
def tacosandburritos_train(
resource_group,
workspace,
dataset
):
"""Pipeline steps"""
persistent_volume_path = '/mnt/azure'
data_download = dataset # noqa: E501
batch = 32
model_name = 'tacosandburritos'
operations = {}
image_size = 160
training_folder = 'train'
training_dataset = 'train.txt'
model_folder = 'model'
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
callback_url = 'kubemlopsbot-svc.kubeflow.svc.cluster.local:8080'
mlflow_url = 'http://mlflow:5000'
exit_handler = exit_op(callback_url=callback_url,
callback_payload=get_callback_payload(TRAIN_FINISH_EVENT))
exit_op = dsl.ContainerOp(
name='Exit Handler',
image="curlimages/curl",
command=['curl'],
arguments=[
'-d', get_callback_payload(TRAIN_FINISH_EVENT),
callback_url
]
)
with dsl.ExitHandler(exit_handler):
with dsl.ExitHandler(exit_op):
start_callback = \
dsl.UserContainer('callback',
'curlimages/curl',
command=['curl'],
args=['-d',
get_callback_payload(TRAIN_START_EVENT), callback_url]) # noqa: E501
operations['data processing on databricks'] = databricks_op(run_id=dsl.RUN_ID_PLACEHOLDER, # noqa: E501
notebook_params='{"argument_one":"param one","argument_two":"param two"}' # noqa: E501
).apply(use_databricks_secret()). \
add_init_container(get_start_callback_container()). \
apply(use_image(databricks_image_name))
operations['data processing on databricks'] = dsl.ContainerOp(
name='data processing on databricks',
init_containers=[start_callback],
image=image_repo_name + '/databricks-notebook:latest',
arguments=[
'-r', dsl.RUN_ID_PLACEHOLDER,
'-p', '{"argument_one":"param one","argument_two":"param two"}'
]
).apply(use_databricks_secret())
operations['preprocess'] = dsl.ContainerOp(
name='preprocess',
image=image_repo_name + '/preprocess:latest',
command=['python'],
arguments=[
'/scripts/data.py',
'--base_path', persistent_volume_path,
'--data', training_folder,
'--target', training_dataset,
'--img_size', image_size,
'--zipfile', data_download
]
)
operations['preprocess'] = preprocess_op(base_path=persistent_volume_path, # noqa: E501
training_folder=training_folder, # noqa: E501
target=training_dataset,
image_size=image_size,
zipfile=dataset). \
apply(use_image(preprocess_image_name))
operations['preprocess'].after(operations['data processing on databricks']) # noqa: E501
# train
# TODO: read set of parameters from config file
# with dsl.ParallelFor([{'epochs': 1, 'lr': 0.0001}, {'epochs': 1, 'lr': 0.0002}]) as item: # noqa: E501
operations['training'] = dsl.ContainerOp(
name="training",
image=image_repo_name + '/training:latest',
command=['python'],
arguments=[
'/scripts/train.py',
'--base_path', persistent_volume_path,
'--data', training_folder,
'--epochs', 2,
'--batch', batch,
'--image_size', image_size,
'--lr', 0.0001,
'--outputs', model_folder,
'--dataset', training_dataset
],
output_artifact_paths={ # change output_artifact_paths to file_outputs after this PR is merged https://github.com/kubeflow/pipelines/pull/2334 # noqa: E501
'mlpipeline-metrics': '/mlpipeline-metrics.json',
'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'
}
).add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)).add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')) # noqa: E501
operations['training'] = train_op(base_path=persistent_volume_path,
training_folder=training_folder,
epochs=2,
batch=batch,
image_size=image_size,
lr=0.0001,
model_folder=model_folder,
images=training_dataset,
dataset=operations['preprocess'].outputs['dataset']). \
set_memory_request('16G'). \
add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')).apply(use_image(train_image_name)) # noqa: E501, E127
operations['training'].after(operations['preprocess'])
operations['evaluate'] = dsl.ContainerOp(
name='evaluate',
image="busybox",
command=['sh', '-c'],
arguments=[
'echo',
'Life is Good!'
]
operations['evaluate'] = evaluate_op(model=operations['training'].outputs['model'])
operations['evaluate'].after(operations['training'])
)
operations['evaluate'].after(operations['training'])
# register kubeflow artifcats model
operations['register to kubeflow'] = dsl.ContainerOp(
name='register to kubeflow',
image=image_repo_name + '/registerartifacts:latest',
command=['python'],
arguments=[
'/scripts/registerartifacts.py',
'--base_path', persistent_volume_path,
'--model', 'latest.h5',
'--model_name', model_name,
'--data', training_folder,
'--dataset', training_dataset,
'--run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_azure_secret())
operations['register to kubeflow'].after(operations['evaluate'])
operations['register to AML'] = register_op(base_path=persistent_volume_path,
model_file='latest.h5',
model_name=model_name,
tenant_id='$(AZ_TENANT_ID)',
service_principal_id='$(AZ_CLIENT_ID)',
service_principal_password='$(AZ_CLIENT_SECRET)',
subscription_id='$(AZ_SUBSCRIPTION_ID)',
resource_group=resource_group,
workspace=workspace,
run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_azure_secret()).apply(use_image(register_images_name)) # noqa: E501, E127
# register model
operations['register to AML'] = dsl.ContainerOp(
name='register to AML',
image=image_repo_name + '/register:latest',
command=['python'],
arguments=[
'/scripts/register.py',
'--base_path', persistent_volume_path,
'--model', 'latest.h5',
'--model_name', model_name,
'--tenant_id', "$(AZ_TENANT_ID)",
'--service_principal_id', "$(AZ_CLIENT_ID)",
'--service_principal_password', "$(AZ_CLIENT_SECRET)",
'--subscription_id', "$(AZ_SUBSCRIPTION_ID)",
'--resource_group', resource_group,
'--workspace', workspace,
'--run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_azure_secret())
operations['register to AML'].after(operations['register to kubeflow'])
operations['register to AML'].after(operations['evaluate'])
operations['register to mlflow'] = register_mlflow_op(model='model',
model_name=model_name,
experiment_name='mexicanfood',
run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_azure_secret()). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)).apply(use_image(register_mlflow_image_name)) # noqa: E501
# register model to mlflow
operations['register to mlflow'] = dsl.ContainerOp(
name='register to mlflow',
image=image_repo_name + '/register-mlflow:latest',
command=['python'],
arguments=[
'/scripts/register.py',
'--model', 'model',
'--model_name', model_name,
'--experiment_name', 'mexicanfood',
'--run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_azure_secret()).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)) # noqa: E501
operations['register to mlflow'].after(operations['register to AML'])
operations['finalize'] = dsl.ContainerOp(
name='Finalize',
image="curlimages/curl",
command=['curl'],
arguments=[
'-d', get_callback_payload("Model is registered"),
callback_url
]
)
operations['finalize'] = finalize_op(callback_url=callback_url,
callback_payload=get_callback_payload("Model is registered"))
operations['finalize'].after(operations['register to mlflow'])
# operations['deploy'] = dsl.ContainerOp(
# name='deploy',
# image=image_repo_name + '/deploy:latest',
# command=['sh'],
# arguments=[
# '/scripts/deploy.sh',
# '-n', model_name,
# '-m', model_name,
# '-t', "$(AZ_TENANT_ID)",
# '-r', resource_group,
# '-w', workspace,
# '-s', "$(AZ_CLIENT_ID)",
# '-p', "$(AZ_CLIENT_SECRET)",
# '-u', "$(AZ_SUBSCRIPTION_ID)",
# '-b', persistent_volume_path,
# '-x', dsl.RUN_ID_PLACEHOLDER
# ]
# ).apply(use_azure_secret())
# operations['deploy'].after(operations['register'])
for _, op_1 in operations.items():
op_1.container.set_image_pull_policy("Always")

Просмотреть файл

@ -1,60 +1,56 @@
"KubeFlow Pipeline with AzureDevops Callback"
# flake8: noqa E501
# "KubeFlow Pipeline with AzureDevops Callback"
import os
from kubernetes import client as k8s_client
import kfp.dsl as dsl
import kfp.compiler as compiler
# import kfp.components as comp
import kfp.components as components
from kfp.azure import use_azure_secret
from kubernetes.client.models import V1EnvVar
from utils.kfp_helper import use_databricks_secret, use_image
persistent_volume_path = '/mnt/azure'
batch = 32
model_name = 'tacosandburritos'
operations = {}
image_size = 160
training_folder = 'train'
training_dataset = 'train.txt'
model_folder = 'model'
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
mlflow_url = 'http://mlflow:5000'
kfp_host_url = 'http://52.149.63.253/pipeline'
component_root = os.path.join(os.path.dirname(
os.path.abspath(__file__)), ".")
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
mlflow_project_op = components.load_component_from_file(os.path.join(component_root, 'mlflow-project/component.yaml')) # noqa: E501
mlflow_project_image_name = image_repo_name + '/mlflowproject:%s' % (os.getenv('MLFLOWPROJECT_TAG') or 'latest') # noqa: E501
train_op = components.load_component_from_file(os.path.join(component_root, 'training/component.yaml')) # noqa: E501
train_image_name = image_repo_name + '/training:%s' % (os.getenv('TRAINING_TAG') or 'latest') # noqa: E501
evaluate_op = components.load_component_from_file(os.path.join(component_root, 'evaluate/component.yaml')) # noqa: E501
register_op = components.load_component_from_file(os.path.join(component_root, 'register/component.yaml')) # noqa: E501
register_images_name = image_repo_name + '/register:%s' % (os.getenv('REGISTER_TAG') or 'latest') # noqa: E501
register_mlflow_op = components.load_component_from_file(os.path.join(component_root, 'register-mlflow/component.yaml')) # noqa: E501
register_mlflow_image_name = image_repo_name + '/register-mlflow:%s' % (os.getenv('REGISTERMLFLOW_TAG') or 'latest') # noqa: E501
exit_op = components.load_component_from_file(os.path.join(component_root, 'azdocallback/component.yaml')) # noqa: E501
exit_image_name = image_repo_name + '/azdocallback:%s' % (os.getenv('AZDOCALLBACK_TAG') or 'latest') # noqa: E501
preprocess_op = components.load_component_from_file(os.path.join(component_root, 'preprocess/component.yaml')) # noqa: E501
preprocess_image_name = image_repo_name + '/preprocess:%s' % (os.getenv('PREPROCESS_TAG') or 'latest') # noqa: E501
@dsl.pipeline(
name='Tacos vs. Burritos',
description='Simple TF CNN'
)
def use_databricks_secret(secret_name='databricks-secret'):
def _use_databricks_secret(task):
from kubernetes import client as k8s_client
(
task.container
.add_env_variable(
k8s_client.V1EnvVar(
name='DATABRICKS_HOST',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_HOST'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='DATABRICKS_TOKEN',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_TOKEN'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='CLUSTER_ID',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='CLUSTER_ID'
)
)
)
)
)
return task
return _use_databricks_secret
def tacosandburritos_train(
resource_group,
workspace,
@ -62,175 +58,71 @@ def tacosandburritos_train(
mlflow_experiment_id,
azdocallbackinfo=None
):
"""Pipeline steps"""
persistent_volume_path = '/mnt/azure'
data_download = dataset # noqa: E501,F841
batch = 32
model_name = 'tacosandburritos'
operations = {}
image_size = 160
training_folder = 'train'
training_dataset = 'train.txt'
model_folder = 'model'
image_repo_name = "kubeflowyoacr.azurecr.io/mexicanfood"
mlflow_url = 'http://mlflow:5000'
kfp_host_url = 'http://52.149.63.253/pipeline'
exit_handler_op = exit_op(kfp_host_url=kfp_host_url,
azdocallbackinfo=azdocallbackinfo,
run_id=dsl.RUN_ID_PLACEHOLDER,
tenant_id="$(AZ_TENANT_ID)",
service_principal_id="$(AZ_CLIENT_ID)",
service_principal_password="$(AZ_CLIENT_SECRET)").apply(use_azure_secret()).apply(use_image(exit_image_name)) # noqa: E501
exit_op = dsl.ContainerOp(
name='Exit Handler',
image=image_repo_name + '/azdocallback:%s' % (os.getenv('AZDOCALLBACK_TAG') or 'latest'), # noqa: E501
command=['python'],
arguments=[
'/scripts/azdocallback.py',
'--kfp_host_url', kfp_host_url,
'--azdocallback', azdocallbackinfo,
'--run_id', dsl.RUN_ID_PLACEHOLDER,
'--tenant_id', "$(AZ_TENANT_ID)",
'--service_principal_id', "$(AZ_CLIENT_ID)",
'--service_principal_password', "$(AZ_CLIENT_SECRET)",
]
).apply(use_azure_secret())
with dsl.ExitHandler(exit_op=exit_handler_op):
with dsl.ExitHandler(exit_op=exit_op):
operations['mlflowproject'] = mlflow_project_op(mlflow_experiment_id=mlflow_experiment_id, # noqa: E501
kf_run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_databricks_secret()).apply(use_image(mlflow_project_image_name)) # noqa: E501
operations['mlflowproject'] = dsl.ContainerOp(
name='Run MLflow Project on Azure Databricks',
image=image_repo_name + '/mlflowproject:%s' % (os.getenv('MLFLOWPROJECT_TAG') or 'latest'), # noqa: E501
command=['python'],
arguments=[
'/scripts/run.py',
'--experiement_id', mlflow_experiment_id,
'--kf_run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_databricks_secret())
operations['preprocess'] = preprocess_op(base_path=persistent_volume_path, # noqa: E501
training_folder=training_folder, # noqa: E501
target=training_dataset,
image_size=image_size,
zipfile=dataset).apply(use_image(preprocess_image_name)) # noqa: E501
# operations['preprocess'] = dsl.ContainerOp(
# name='preprocess',
# image=image_repo_name + '/preprocess:%s' % (os.getenv('PREPROCESS_TAG') or 'latest'), # noqa: E501
# command=['python'],
# arguments=[
# '/scripts/data.py',
# '--base_path', persistent_volume_path,
# '--data', training_folder,
# '--target', training_dataset,
# '--img_size', image_size,
# '--zipfile', data_download
# ]
# )
operations['preprocess'].after(operations['mlflowproject']) # noqa: E501
# operations['preprocess'].after(operations['data processing on databricks']) # noqa: E501
operations['training'] = train_op(base_path=persistent_volume_path,
training_folder=training_folder,
epochs=2,
batch=batch,
image_size=image_size,
lr=0.0001,
model_folder=model_folder,
images=training_dataset,
dataset=operations['preprocess'].outputs['dataset']). \
set_memory_request('16G'). \
add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')). \
apply(use_image(train_image_name))
# train
# TODO: read set of parameters from config file
# with dsl.ParallelFor([{'epochs': 1, 'lr': 0.0001}, {'epochs': 1, 'lr': 0.0002}]) as item: # noqa: E501
operations['training'] = dsl.ContainerOp(
name="training",
image=image_repo_name + '/training:%s' % (os.getenv('TRAINING_TAG') or 'latest'), # noqa: E501
command=['python'],
arguments=[
'/scripts/train.py',
'--base_path', persistent_volume_path,
'--data', training_folder,
'--epochs', 2,
'--batch', batch,
'--image_size', image_size,
'--lr', 0.0001,
'--outputs', model_folder,
'--dataset', training_dataset
],
output_artifact_paths={ # change output_artifact_paths to file_outputs after this PR is merged https://github.com/kubeflow/pipelines/pull/2334 # noqa: E501
'mlpipeline-metrics': '/mlpipeline-metrics.json',
'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'
}
).add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)).add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')) # noqa: E501
operations['training'].after(operations['preprocess'])
operations['training'].after(operations['mlflowproject'])
operations['evaluate'] = evaluate_op(model=operations['training'].outputs['model'])
operations['evaluate'].after(operations['training'])
operations['evaluate'] = dsl.ContainerOp(
name='evaluate',
image="busybox",
command=['sh', '-c'],
arguments=[
'echo',
'Life is Good!'
]
operations['register to AML'] = register_op(base_path=persistent_volume_path,
model_file='latest.h5',
model_name=model_name,
tenant_id='$(AZ_TENANT_ID)',
service_principal_id='$(AZ_CLIENT_ID)',
service_principal_password='$(AZ_CLIENT_SECRET)',
subscription_id='$(AZ_SUBSCRIPTION_ID)',
resource_group=resource_group,
workspace=workspace,
run_id=dsl.RUN_ID_PLACEHOLDER). \
apply(use_azure_secret()). \
apply(use_image(register_images_name))
)
operations['evaluate'].after(operations['training'])
# register kubeflow artifcats model
# operations['register to kubeflow'] = dsl.ContainerOp(
# name='register to kubeflow',
# image=image_repo_name + '/registerartifacts:%s' % (os.getenv('REGISTERARTIFACTS_TAG') or 'latest'), # noqa: E501
# command=['python'],
# arguments=[
# '/scripts/registerartifacts.py',
# '--base_path', persistent_volume_path,
# '--model', 'latest.h5',
# '--model_name', model_name,
# '--data', training_folder,
# '--dataset', training_dataset,
# '--run_id', dsl.RUN_ID_PLACEHOLDER
# ]
# ).apply(use_azure_secret())
# operations['register to kubeflow'].after(operations['evaluate'])
# register model
operations['register to AML'] = dsl.ContainerOp(
name='register to AML',
image=image_repo_name + '/register:%s' % (os.getenv('REGISTER_TAG') or 'latest'), # noqa: E501
command=['python'],
arguments=[
'/scripts/register.py',
'--base_path', persistent_volume_path,
'--model', 'latest.h5',
'--model_name', model_name,
'--tenant_id', "$(AZ_TENANT_ID)",
'--service_principal_id', "$(AZ_CLIENT_ID)",
'--service_principal_password', "$(AZ_CLIENT_SECRET)",
'--subscription_id', "$(AZ_SUBSCRIPTION_ID)",
'--resource_group', resource_group,
'--workspace', workspace,
'--run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_azure_secret())
operations['register to AML'].after(operations['evaluate'])
# register model to mlflow
operations['register to mlflow'] = dsl.ContainerOp(
name='register to mlflow',
image=image_repo_name + '/register-mlflow:%s' % (os.getenv('REGISTERMLFLOW_TAG') or 'latest'), # noqa: E501
command=['python'],
arguments=[
'/scripts/register.py',
'--model', 'model',
'--model_name', model_name,
'--experiment_name', 'mexicanfood',
'--run_id', dsl.RUN_ID_PLACEHOLDER
]
).apply(use_azure_secret()).add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)) # noqa: E501
operations['register to mlflow'].after(operations['register to AML'])
operations['register to mlflow'] = register_mlflow_op(model='model',
model_name=model_name,
experiment_name='mexicanfood',
run_id=dsl.RUN_ID_PLACEHOLDER). \
apply(use_azure_secret()). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
apply(use_image(register_mlflow_image_name))
# operations['deploy'] = dsl.ContainerOp(
# name='deploy',
# image=image_repo_name + '/deploy:latest',
# command=['sh'],
# arguments=[
# '/scripts/deploy.sh',
# '-n', model_name,
# '-m', model_name,
# '-t', "$(AZ_TENANT_ID)",
# '-r', resource_group,
# '-w', workspace,
# '-s', "$(AZ_CLIENT_ID)",
# '-p', "$(AZ_CLIENT_SECRET)",
# '-u', "$(AZ_SUBSCRIPTION_ID)",
# '-b', persistent_volume_path,
# '-x', dsl.RUN_ID_PLACEHOLDER
# ]
# ).apply(use_azure_secret())
# operations['deploy'].after(operations['register'])
operations['register to mlflow'].after(operations['register to AML'])
for _, op_1 in operations.items():
op_1.container.set_image_pull_policy("Always")

Просмотреть файл

@ -0,0 +1,23 @@
name: Data Preprocessing
inputs:
- {name: base_path, type: String}
- {name: training_folder, type: String}
- {name: target, type: String, default: 'train.txt'}
- {name: image_size, type: Integer}
- {name: zipfile, type: String}
outputs:
- {name: dataset, type: kubeflow.org/alpha/data_set}
implementation:
container:
image: 'kubeflowyoacr.azurecr.io/mexicanfood/preprocess:latest'
command: [
"python", "/scripts/data.py",
'--base_path', {inputValue: base_path},
'--data', {inputValue: training_folder},
'--target', {inputValue: target},
'--img_size', {inputValue: image_size},
'--zipfile', {inputValue: zipfile},
'--output_dataset', {outputPath: dataset}
]

Просмотреть файл

@ -93,6 +93,7 @@ if __name__ == "__main__":
'-z', '--zipfile', help='source data zip file', default='../../tacodata.zip') # noqa: E501
parser.add_argument('-f', '--force',
help='force clear all data', default=False, action='store_true') # noqa: E501
parser.add_argument('-o', '--output_dataset') # noqa: E501
args = parser.parse_args()
print(args)
@ -110,13 +111,18 @@ if __name__ == "__main__":
download('https://aiadvocate.blob.core.windows.net/public/tacodata.zip',
str(base_path), args.force)
print('Testing images...')
images = walk_images(str(data_path), args.img_size)
if os.path.exists(str(target_path)):
print('dataset text file already exists, skipping check')
else:
print('Testing images...')
images = walk_images(str(data_path), args.img_size)
# save file
print('writing dataset to {}'.format(target_path))
with open(str(target_path), 'w+') as f:
f.write('\n'.join(images))
output_path = Path(args.output_dataset).resolve(strict=False)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(str(output_path), 'w+') as f:
f.write('\n'.join(images))

Просмотреть файл

@ -0,0 +1,17 @@
name: Register to MLFlow
inputs:
- {name: model, type: String}
- {name: model_name, type: String}
- {name: experiment_name, type: String}
- {name: run_id, type: String}
implementation:
container:
image: 'kubeflowyoacr.azurecr.io/mexicanfood/register-mlflow:latest'
command: [
"python", "/scripts/register.py",
'--model', {inputValue: model},
'--model_name', {inputValue: model_name},
'--experiment_name', {inputValue: experiment_name},
'--run_id', {inputValue: run_id}
]

Просмотреть файл

@ -0,0 +1,29 @@
name: Register to AML
inputs:
- {name: base_path, type: String}
- {name: model_file, type: String}
- {name: model_name, type: String}
- {name: tenant_id, type: String}
- {name: service_principal_id, type: String}
- {name: service_principal_password, type: String}
- {name: subscription_id, type: String}
- {name: resource_group, type: String}
- {name: workspace, type: String}
- {name: run_id, type: String}
implementation:
container:
image: 'kubeflowyoacr.azurecr.io/mexicanfood/register:latest'
command: [
"python", "/scripts/register.py",
'--base_path', {inputValue: base_path},
'--model', {inputValue: model_file},
'--model_name', {inputValue: model_name},
'--tenant_id', {inputValue: tenant_id},
'--service_principal_id', {inputValue: service_principal_id},
'--service_principal_password', {inputValue: service_principal_password},
'--subscription_id', {inputValue: subscription_id},
'--resource_group', {inputValue: resource_group},
'--workspace', {inputValue: workspace},
'--run_id', {inputValue: run_id}
]

Просмотреть файл

@ -0,0 +1,35 @@
name: Training
inputs:
- {name: base_path, type: String}
- {name: training_folder, type: String}
- {name: epochs, type: Integer}
- {name: batch, type: Integer}
- {name: image_size, type: Integer}
- {name: lr, type: Float}
- {name: model_folder, type: Float}
- {name: images, type: String}
- {name: dataset, type: kubeflow.org/alpha/data_set, default: None}
outputs:
- {name: model, type: kubeflow.org/alpha/model}
- {name: mlpipeline-ui-metadata, type: String}
- {name: mlpipeline-metrics, type: String}
implementation:
container:
image: 'kubeflowyoacr.azurecr.io/mexicanfood/training:latest'
command: [
"python", "/scripts/train.py",
'--base_path', {inputValue: base_path},
'--data', {inputValue: training_folder},
'--epochs', {inputValue: epochs},
'--batch', {inputValue: batch},
'--image_size', {inputValue: image_size},
'--lr', {inputValue: lr},
'--outputs', {inputValue: model_folder},
'--dataset', {inputPath: dataset},
'--model', {outputPath: model},
'--ui_metadata', {outputPath: mlpipeline-ui-metadata},
'--metrics', {outputPath: mlpipeline-metrics}
]

Просмотреть файл

@ -76,7 +76,8 @@ def run(
batch_size=32,
learning_rate=0.0001,
output='model',
dset=None):
dset=None,
metrics_file='/mlpipeline-metrics.json'):
global g_image_size
g_image_size = img_size
@ -148,7 +149,7 @@ def run(
# Pipeline Metric
info('Writing Pipeline Metric')
with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
with file_io.FileIO(metrics_file, 'w') as f:
json.dump(metrics, f)
# save model
@ -201,7 +202,10 @@ if __name__ == "__main__":
default=0.0001, type=float)
parser.add_argument('-o', '--outputs',
help='output directory', default='model')
parser.add_argument('-f', '--dataset', help='cleaned data listing')
parser.add_argument('-f', '--dataset', help='input dataset')
parser.add_argument('-m', '--model', help='output model info')
parser.add_argument('-u', '--ui_metadata', help='ui metadata')
parser.add_argument('-me', '--metrics', help='model metrics')
args = parser.parse_args()
info('Using TensorFlow v.{}'.format(tf.__version__))
@ -212,6 +216,15 @@ if __name__ == "__main__":
dataset = Path(args.base_path).joinpath(args.dataset)
image_size = args.image_size
output_model_file = Path(args.model).resolve(strict=False)
Path(output_model_file).parent.mkdir(parents=True, exist_ok=True)
ui_metadata_file = Path(args.ui_metadata).resolve(strict=False)
Path(ui_metadata_file).parent.mkdir(parents=True, exist_ok=True)
metrics_file = Path(args.metrics).resolve(strict=False)
Path(metrics_file).parent.mkdir(parents=True, exist_ok=True)
params = Path(args.base_path).joinpath('params.json')
args = {
@ -221,7 +234,8 @@ if __name__ == "__main__":
"batch_size": args.batch,
"learning_rate": args.lr,
"output": str(target_path),
"dset": str(dataset)
"dset": str(dataset),
"metrics_file": str(metrics_file)
}
dataset_signature = generate_hash(dataset, 'kf_pipeline')
@ -257,7 +271,13 @@ if __name__ == "__main__":
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
# Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(str(ui_metadata_file), 'w') as f:
json.dump(metadata, f)
# python train.py -d train -e 3 -b 32 -l 0.0001 -o model -f train.txt
model_output_content = []
for filename in target_path.iterdir():
model_output_content.append(str(filename))
with open(str(output_model_file), 'w+') as f:
f.write('\n'.join(model_output_content))

48
code/utils/kfp_helper.py Normal file
Просмотреть файл

@ -0,0 +1,48 @@
def use_databricks_secret(secret_name='databricks-secret'):
def _use_databricks_secret(task):
from kubernetes import client as k8s_client
(
task.container
.add_env_variable(
k8s_client.V1EnvVar(
name='DATABRICKS_HOST',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_HOST'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='DATABRICKS_TOKEN',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='DATABRICKS_TOKEN'
)
)
)
)
.add_env_variable( # noqa: E131
k8s_client.V1EnvVar(
name='CLUSTER_ID',
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=secret_name,
key='CLUSTER_ID'
)
)
)
)
)
return task
return _use_databricks_secret
def use_image(image_name):
def _use_image(task):
task.image = image_name
return task
return _use_image