Enable node tolerations and label selectors in the pipeline (#103)

This commit is contained in:
Tom Care 2020-10-27 14:53:28 -07:00 коммит произвёл GitHub
Родитель 06267986f4
Коммит ea021cbd07
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 67 добавлений и 25 удалений

Просмотреть файл

@ -87,6 +87,8 @@ def tacosandburritos_train(
notebook_params='{"argument_one":"param one","argument_two":"param two"}' # noqa: E501
).apply(use_databricks_secret()). \
add_init_container(get_start_callback_container()). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(
use_image(databricks_image_name))
@ -95,29 +97,35 @@ def tacosandburritos_train(
target=training_dataset,
image_size=image_size,
zipfile=dataset). \
set_memory_request('500M'). \
set_memory_limit('750M'). \
apply(
use_image(preprocess_image_name))
operations['preprocess'].after(operations['data processing on databricks']) # noqa: E501
operations['training'] = train_op(base_path=persistent_volume_path,
training_folder=training_folder,
epochs=2,
batch=batch,
image_size=image_size,
lr=0.0001,
model_folder=model_folder,
images=training_dataset,
dataset=operations['preprocess'].outputs['dataset']). \
set_memory_request('16G'). \
training_folder=training_folder,
epochs=2,
batch=batch,
image_size=image_size,
lr=0.0001,
model_folder=model_folder,
images=training_dataset,
dataset=operations['preprocess'].outputs['dataset']). \
set_memory_request('5G'). \
set_memory_limit('6G'). \
add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')).apply(use_image(train_image_name)) # noqa: E501, E127
add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')). \
apply(use_image(train_image_name)) # noqa: E501, E127
operations['training'].after(operations['preprocess'])
operations['evaluate'] = evaluate_op(
model=operations['training'].outputs['model'])
model=operations['training'].outputs['model']). \
set_memory_request('25M'). \
set_memory_limit('50M')
operations['evaluate'].after(operations['training'])
operations['register to AML'] = register_op(base_path=persistent_volume_path,
@ -129,20 +137,30 @@ def tacosandburritos_train(
subscription_id='$(AZ_SUBSCRIPTION_ID)',
resource_group=resource_group,
workspace=workspace,
run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_azure_secret()).apply(use_image(register_images_name)) # noqa: E501, E127
run_id=dsl.RUN_ID_PLACEHOLDER). \
apply(use_azure_secret()). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_image(register_images_name)) # noqa: E501, E127
operations['register to AML'].after(operations['evaluate'])
operations['register to mlflow'] = register_mlflow_op(model='model',
model_name=model_name,
experiment_name='mexicanfood',
run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_azure_secret()). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)).apply(use_image(register_mlflow_image_name)) # noqa: E501
run_id=dsl.RUN_ID_PLACEHOLDER). \
apply(use_azure_secret()). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_image(register_mlflow_image_name)) # noqa: E501
operations['register to mlflow'].after(operations['register to AML'])
operations['finalize'] = finalize_op(callback_url=callback_url,
callback_payload=get_callback_payload("Model is registered"))
callback_payload=get_callback_payload("Model is registered")). \
set_memory_request('100M'). \
set_memory_limit('200M')
operations['finalize'].after(operations['register to mlflow'])
for _, op_1 in operations.items():
@ -155,6 +173,10 @@ def tacosandburritos_train(
)
).add_volume_mount(k8s_client.V1VolumeMount(
mount_path='/mnt/azure', name='azure'))
# Specify training node pool affinity
op_1.add_node_selector_constraint('agentpool', 'training')
# Allow all steps to be scheduled on the training node pool
op_1.add_toleration(k8s_client.V1Toleration(key='sku', value='training', effect='NoSchedule'))
if __name__ == '__main__':

Просмотреть файл

@ -64,22 +64,30 @@ def tacosandburritos_train(
tenant_id="$(AZ_TENANT_ID)",
service_principal_id="$(AZ_CLIENT_ID)",
service_principal_password="$(AZ_CLIENT_SECRET)",
pat_env="PAT_ENV"
).apply(use_azure_secret()
).apply(use_kfp_host_secret()
).apply(use_image(exit_image_name)
).apply(use_secret_var("azdopat", "PAT_ENV", "azdopat"))
pat_env="PAT_ENV"). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_azure_secret()). \
apply(use_kfp_host_secret()). \
apply(use_image(exit_image_name)). \
apply(use_secret_var("azdopat", "PAT_ENV", "azdopat"))
with dsl.ExitHandler(exit_op=exit_handler_op):
operations['mlflowproject'] = mlflow_project_op(mlflow_experiment_id=mlflow_experiment_id, # noqa: E501
kf_run_id=dsl.RUN_ID_PLACEHOLDER).apply(use_databricks_secret()).apply(use_image(mlflow_project_image_name)) # noqa: E501
kf_run_id=dsl.RUN_ID_PLACEHOLDER). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_databricks_secret()). \
apply(use_image(mlflow_project_image_name)) # noqa: E501
operations['preprocess'] = preprocess_op(base_path=persistent_volume_path, # noqa: E501
training_folder=training_folder, # noqa: E501
target=training_dataset,
image_size=image_size,
zipfile=dataset).apply(use_image(preprocess_image_name)) # noqa: E501
zipfile=dataset).apply(use_image(preprocess_image_name)). \
set_memory_request('500M'). \
set_memory_limit('750M')
operations['preprocess'].after(operations['mlflowproject']) # noqa: E501
@ -92,7 +100,8 @@ def tacosandburritos_train(
model_folder=model_folder,
images=training_dataset,
dataset=operations['preprocess'].outputs['dataset']). \
set_memory_request('16G'). \
set_memory_request('5G'). \
set_memory_limit('6G'). \
add_env_variable(V1EnvVar(name="RUN_ID", value=dsl.RUN_ID_PLACEHOLDER)). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
add_env_variable(V1EnvVar(name="GIT_PYTHON_REFRESH", value='quiet')). \
@ -114,7 +123,9 @@ def tacosandburritos_train(
operations['training'].after(operations['preprocess'])
operations['evaluate'] = evaluate_op(
model=operations['training'].outputs['model'])
model=operations['training'].outputs['model']). \
set_memory_request('25M'). \
set_memory_limit('50M')
operations['evaluate'].after(operations['training'])
operations['register to AML'] = register_op(base_path=persistent_volume_path,
@ -127,15 +138,20 @@ def tacosandburritos_train(
resource_group=resource_group,
workspace=workspace,
run_id=dsl.RUN_ID_PLACEHOLDER). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_azure_secret()). \
apply(use_image(register_images_name))
operations['register to AML'].after(operations['evaluate'])
operations['register to AML']. \
after(operations['evaluate'])
operations['register to mlflow'] = register_mlflow_op(model='model',
model_name=model_name,
experiment_name='mexicanfood',
run_id=dsl.RUN_ID_PLACEHOLDER). \
set_memory_request('100M'). \
set_memory_limit('200M'). \
apply(use_azure_secret()). \
add_env_variable(V1EnvVar(name="MLFLOW_TRACKING_URI", value=mlflow_url)). \
apply(use_image(register_mlflow_image_name))
@ -152,6 +168,10 @@ def tacosandburritos_train(
)
).add_volume_mount(k8s_client.V1VolumeMount(
mount_path='/mnt/azure', name='azure'))
# Specify training node pool affinity
op_1.add_node_selector_constraint('agentpool', 'training')
# Allow all steps to be scheduled on the training node pool
op_1.add_toleration(k8s_client.V1Toleration(key='sku', value='training', effect='NoSchedule'))
if __name__ == '__main__':