Use spec's type over loading component (#518)

* Use spec's type over loading component

* Fix flakes

* pipeline may have inline component definition

* Update scripts/release/asset_publish.py

Co-authored-by: Louie Larson <elesel@gmail.com>

---------

Co-authored-by: Sasidhar Kasturi <sasik@microsoft.com>
Co-authored-by: Louie Larson <elesel@gmail.com>
This commit is contained in:
Ayush Mishra 2023-04-07 16:23:10 +05:30 коммит произвёл GitHub
Родитель bbfb083f34
Коммит 281e512c11
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 35 добавлений и 16 удалений

Просмотреть файл

@ -22,7 +22,7 @@ import yaml
from azureml.assets.config import AssetConfig, PathType
from azureml.assets.model import ModelDownloadUtils
from azureml.assets.util import logger
from azure.ai.ml import load_component, load_model
from azure.ai.ml import load_model
from azure.ai.ml.entities import Component, Environment, Model
@ -169,7 +169,11 @@ def validate_and_prepare_pipeline_component(
:rtype: bool
"""
with open(spec_path) as f:
pipeline_dict = yaml.safe_load(f)
try:
pipeline_dict = yaml.safe_load(f)
except Exception:
logger.log_error(f"Error in loading component spec at {spec_path}")
return False
jobs = pipeline_dict['jobs']
logger.print(f"Preparing pipeline component {pipeline_dict['name']}")
@ -177,6 +181,12 @@ def validate_and_prepare_pipeline_component(
for job_name, job_details in jobs.items():
logger.print(f"job {job_name}")
if not job_details.get('component'):
# if-else or inline component
logger.print(f"component not defined for job {job_name}")
updated_jobs[job_name] = job_details
continue
try:
name, version, label, registry = get_parsed_details_from_asset_uri(
assets.AssetType.COMPONENT.value, job_details['component'])
@ -214,7 +224,6 @@ def validate_and_prepare_pipeline_component(
)
return False
# logger.print(asset_details)
updated_jobs[job_name] = job_details
updated_jobs[job_name]['component'] = asset_details["id"]
@ -229,15 +238,12 @@ def validate_and_prepare_pipeline_component(
def validate_update_command_component(
component: Component,
spec_path: Path,
final_version: str,
registry_name: str,
) -> bool:
"""Validate and update command component spec.
:param component: A command component
:type component: Component
:param spec_path: Path of loaded component
:type spec_path: Path
:param final_version: Final version string used to create component
@ -247,9 +253,20 @@ def validate_update_command_component(
:return: True for successful validation and update
:rtype: bool
"""
with open(spec_path) as f:
try:
component_dict = yaml.safe_load(f)
except Exception:
logger.log_error(f"Error in loading component spec at {spec_path}")
return False
component_name = component_dict['name']
component_env = component_dict['environment']
logger.print(f"Preparing command component {component_name}")
try:
env_name, env_version, env_label, env_registry_name = get_parsed_details_from_asset_uri(
assets.AssetType.ENVIRONMENT.value, component.environment)
assets.AssetType.ENVIRONMENT.value, component_env)
except Exception as e:
logger.log_error(e)
return False
@ -293,13 +310,15 @@ def validate_update_command_component(
break
if not env:
logger.log_error(f"Could not find the env for {component.name}")
logger.log_error(f"Could not find the env for {component_name}")
return False
env_id = env["id"]
logger.print(f"Updating component env to {env_id}")
component.environment = env_id
if not update_spec(component, spec_path):
logger.print(f"Updating component env to {env['id']}")
component_dict['environment'] = env['id']
try:
util.dump_yaml(component_dict, spec_path)
except Exception:
logger.log_error(f"Component update failed for asset spec path: {asset.spec_path}")
return False
return True
@ -533,16 +552,16 @@ if __name__ == "__main__":
# Handle specific asset types
if asset.type == assets.AssetType.COMPONENT:
# load component and check if environment exists
component = load_component(asset.spec_with_path)
if component.type == assets.ComponentType.PIPELINE.value:
component_type = asset.spec_as_object().type
if component_type == assets.ComponentType.PIPELINE.value:
if not validate_and_prepare_pipeline_component(
asset.spec_with_path, final_version, registry_name
):
failure_list.append(asset)
continue
elif component.type is None or component.type == assets.ComponentType.COMMAND.value:
elif component_type is None or component_type == assets.ComponentType.COMMAND.value:
if not validate_update_command_component(
component, asset.spec_with_path, final_version, registry_name
asset.spec_with_path, final_version, registry_name
):
failure_list.append(asset)
continue