diff --git a/scripts/release/asset_publish.py b/scripts/release/asset_publish.py index 607b216665..cd9f075c82 100644 --- a/scripts/release/asset_publish.py +++ b/scripts/release/asset_publish.py @@ -19,7 +19,7 @@ import azureml.assets as assets from azureml.assets.model.mlflow_utils import MLFlowModelUtils import azureml.assets.util as util import yaml -from azureml.assets.config import PathType +from azureml.assets.config import AssetConfig, PathType from azureml.assets.model import ModelDownloadUtils from azureml.assets.util import logger from azure.ai.ml import load_component, load_model @@ -29,7 +29,7 @@ from azure.ai.ml.entities import Component, Environment, Model ASSET_ID_TEMPLATE = Template("azureml://registries/$registry_name/$asset_type/$asset_name/versions/$version") TEST_YML = "tests.yml" PROD_SYSTEM_REGISTRY = "azureml" -PUBLISH_ORDER = [assets.AssetType.ENVIRONMENT, assets.AssetType.COMPONENT, assets.AssetType.MODEL] +CREATE_ORDER = [assets.AssetType.ENVIRONMENT, assets.AssetType.COMPONENT, assets.AssetType.MODEL] WORKSPACE_ASSET_PATTERN = re.compile(r"^(?:azureml:)?(.+)(?::(.+)|@(.+))$") REGISTRY_ENV_PATTERN = re.compile(r"^azureml://registries/(.+)/environments/(.+)/(?:versions/(.+)|labels/(.+))") BEARER = r"Bearer.*" @@ -40,7 +40,7 @@ def find_test_files(dir: Path): test_jobs = [] for test in dir.iterdir(): - logger.print("processing test folder: " + test.name) + logger.print(f"Processing test folder {test.name}") with open(test / TEST_YML) as fp: data = yaml.load(fp, Loader=yaml.FullLoader) for test_group in data.values(): @@ -53,16 +53,15 @@ def find_test_files(dir: Path): def preprocess_test_files(test_jobs, asset_ids: dict): """Preprocess test files to generate asset ids.""" for test_job in test_jobs: - logger.print(f"processing test job: {test_job}") + logger.print(f"Processing test job {test_job}") with open(test_job) as fp: data = yaml.load(fp, Loader=yaml.FullLoader) for job_name, job in data['jobs'].items(): asset_name = job['component'] - logger.print(f"processing asset {asset_name}") + logger.print(f"Processing asset {asset_name}") if asset_name in asset_ids: job['component'] = asset_ids.get(asset_name) - logger.print( - f"for job {job_name}, the new asset id: {job['component']}") + logger.print(f"For job {job_name}, the new asset id is {job['component']}") with open(test_job, "w") as file: yaml.dump(data, file, default_flow_style=False, sort_keys=False) @@ -90,12 +89,12 @@ def update_spec(asset: Union[Component, Environment, Model], spec_path: Path) -> util.dump_yaml(asset_dict, spec_path) return True except Exception as e: - logger.log_error(f"Failed to update spec => {e}") + logger.log_error(f"Failed to update spec: {e}") return False def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_dir: Path) -> bool: - """Prepare Model. + """Prepare model. :param model_config: Model Config object :type model_config: assets.ModelConfig @@ -103,7 +102,7 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_ :type spec_file_path: Path :param model_dir: path of directory where model is present locally or can be downloaded to. :type model_dir: Path - :return: If model can be published to registry. + :return: Model successfully prepared for creation in registry. :rtype: bool """ try: @@ -111,7 +110,7 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_ # TODO: temp fix before restructuring what attributes are required in model config and spec. model.type = model_config.type.value except Exception as e: - logger.error(f"Error in loading model spec file at {spec_file_path} => {e}") + logger.error(f"Error in loading model spec file at {spec_file_path}: {e}") return False if model_config.path.type == PathType.LOCAL: @@ -119,14 +118,13 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_ return update_spec(model, spec_file_path) if model_config.type == assets.ModelType.CUSTOM: - can_publish_model = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir) - if can_publish_model: + success = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir) + if success: model.path = model_dir - can_publish_model = update_spec(model, spec_file_path) - + success = update_spec(model, spec_file_path) elif model_config.type == assets.ModelType.MLFLOW: - can_publish_model = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir) - if can_publish_model: + success = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir) + if success: model.path = model_dir / MLFlowModelUtils.MLFLOW_MODEL_PATH if not model_config.flavors: # try fetching flavors from MLModel file @@ -135,15 +133,13 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_ mlmodel = util.load_yaml(file_path=mlmodel_file_path) model.flavors = mlmodel.get("flavors") except Exception as e: - logger.log_error(f"Error loading flavors from MLmodel file at: {mlmodel_file_path} => {e}") - can_publish_model = update_spec(model, spec_file_path) - + logger.log_error(f"Error loading flavors from MLmodel file at {mlmodel_file_path}: {e}") + success = update_spec(model, spec_file_path) else: - logger.print(model_config.type.value, assets.ModelType.MLFLOW) - can_publish_model = False - logger.log_error(f"Model type {model_config.type} not supported") + logger.log_error(f"Model type {model_config.type.value} not supported") + success = False - return can_publish_model + return success def validate_update_command_component( @@ -158,9 +154,9 @@ def validate_update_command_component( :type component: Component :param spec_path: Path of loaded component :type spec_path: Path - :param final_version: Final version string used to register component + :param final_version: Final version string used to create component :type final_version: str - :param registry_name: name of the registry to publish component to + :param registry_name: name of the registry to create component in :type registry_name: str :return: True for successful validation and update :rtype: bool @@ -174,7 +170,7 @@ def validate_update_command_component( elif (match := WORKSPACE_ASSET_PATTERN.match(env)) is not None: env_name, env_version, env_label = match.group(1), match.group(2), match.group(3) else: - logger.print(f"Env ID doesn't match workspace or registry pattern in {asset.spec_with_path}") + logger.log_error(f"Env ID doesn't match workspace or registry pattern in {asset.spec_with_path}") return False logger.print( @@ -182,8 +178,8 @@ def validate_update_command_component( ) if env_registry_name and env_registry_name not in [PROD_SYSTEM_REGISTRY, registry_name]: - logger.log_warning( - "Unexpected !!! Registry name for component's env URI must be either " + logger.log_error( + "Registry name for component's env URI must be either " + f"'{registry_name}' or '{PROD_SYSTEM_REGISTRY}'. Got '{env_registry_name}'" ) return False @@ -193,26 +189,26 @@ def validate_update_command_component( if env_label: # TODO: Add fetching env from label # https://github.com/Azure/azureml-assets/issues/415 - logger.print("Unexpected !!! Registering a component with env label is not supported.") + logger.log_error("Creating a component with env label is not supported") return False env = None - # Check if component's env is registered + # Check if component's env exists for version in [env_version, final_version]: - if (env := get_registered_asset_details( + if (env := get_asset_details( assets.AssetType.ENVIRONMENT.value, env_name, version, registry_name )) is not None: break if not env: - logger.print(f"Could not find a registered env for {component.name}. Please retry again!!!") + logger.log_error(f"Could not find the env for {component.name}") return False env_id = env["id"] logger.print(f"Updating component env to {env_id}") component.environment = env_id if not update_spec(component, spec_path): - logger.print(f"Component update failed for asset spec path: {asset.spec_path}") + logger.log_error(f"Component update failed for asset spec path: {asset.spec_path}") return False return True @@ -223,7 +219,7 @@ def run_command(cmd: List[str]): return result -def asset_publish_command( +def asset_create_command( asset_type: str, asset_path: str, registry_name: str, @@ -248,7 +244,7 @@ def asset_publish_command( return cmd -def publish_asset( +def create_asset( asset, registry_name, resource_group, @@ -257,8 +253,8 @@ def publish_asset( failure_list, debug_mode: bool = None ): - """Publish asset to registry.""" - cmd = asset_publish_command( + """Create asset in registry.""" + cmd = asset_create_command( asset.type.value, str(asset.spec_with_path), registry_name, version, resource_group, workspace_name, debug_mode ) @@ -271,22 +267,22 @@ def publish_asset( redacted_output = sanitize_output(result.stdout) redacted_err = sanitize_output(result.stderr) if redacted_output: - logger.print(f"stdout:\n{redacted_output}") + logger.print(f"STDOUT: {redacted_output}") if redacted_err: - logger.print(f"stderr:\n{redacted_err}") + logger.print(f"STDERR: {redacted_err}") if result.returncode != 0: - logger.log_warning(f"Error creating {asset.type.value} : {asset.name}. Error {redacted_err}") + logger.log_error(f"Error creating {asset.type.value} {asset.name}: {redacted_err}") failure_list.append(asset) -def get_registered_asset_details( +def get_asset_details( asset_type: str, asset_name: str, asset_version: str, registry_name: str, ) -> Dict: - """Return registered asset details.""" + """Get asset details.""" cmd = [ "az", "ml", asset_type, "show", "--name", asset_name, @@ -295,7 +291,7 @@ def get_registered_asset_details( ] result = run_command(cmd) if result.returncode != 0: - logger.log_warning(f"Error in fetching asset details. Error:\n{result.stderr}") + logger.log_error(f"Failed to get asset details: {result.stderr}") return None return json.loads(result.stdout) @@ -353,43 +349,41 @@ if __name__ == "__main__": failed_list_file = args.failed_list debug_mode = args.debug asset_ids = {} - logger.print("publishing assets") # Load publishing list from deploy config if publish_list_file: with open(publish_list_file) as fp: config = yaml.load(fp, Loader=yaml.FullLoader) - publish_list = config.get('create', {}) + create_list = config.get('create', {}) else: - publish_list = {} + create_list = {} - # Check publishing list - if not publish_list: + # Check create list + if not create_list: logger.log_warning("The create list is empty.") exit(0) - logger.print(f"create list: {publish_list}") + logger.print(f"create list: {create_list}") failure_list = [] all_assets = util.find_assets(input_dirs=assets_dir) - assets_by_type = defaultdict(list) + assets_by_type: Dict[str, List[AssetConfig]] = defaultdict(list) for asset in all_assets: assets_by_type[asset.type.value].append(asset) - for publish_asset_type in PUBLISH_ORDER: - logger.print(f"now publishing {publish_asset_type.value}s.") - if publish_asset_type.value not in publish_list: + for create_asset_type in CREATE_ORDER: + logger.print(f"Creating {create_asset_type.value}s.") + if create_asset_type.value not in create_list: continue - for asset in assets_by_type.get(publish_asset_type.value, []): + for asset in assets_by_type.get(create_asset_type.value, []): with TemporaryDirectory() as work_dir: - asset_names = publish_list.get(asset.type.value, []) + asset_names = create_list.get(asset.type.value, []) if not ("*" in asset_names or asset.name in asset_names): logger.print( - f"Skipping registering asset {asset.name} because it is not in the publish list") + f"Skipping asset {asset.name} because it is not in the create list") continue - logger.print(f"Registering {asset}") final_version = asset.version + "-" + \ passed_version if passed_version else asset.version - logger.print(f"final version: {final_version}") + logger.print(f"Creating {asset.name} {final_version}") asset_ids[asset.name] = ASSET_ID_TEMPLATE.substitute( registry_name=registry_name, asset_type=f"{asset.type.value}s", @@ -400,7 +394,6 @@ if __name__ == "__main__": # Handle specific asset types if asset.type == assets.AssetType.COMPONENT: # load component and check if environment exists - logger.print(f"spec's path: {asset.spec_with_path}") component = load_component(asset.spec_with_path) if component.type == "command": if not validate_update_command_component( @@ -409,23 +402,22 @@ if __name__ == "__main__": failure_list.append(asset) continue elif asset.type == assets.AssetType.MODEL: - # check if model is already registered + # Check if model already exists final_version = asset.version - if get_registered_asset_details(asset.type.value, asset.name, final_version, registry_name): - logger.print(f"Version already registered. Skipping publish for asset: {asset.name}") + if get_asset_details(asset.type.value, asset.name, final_version, registry_name): + logger.print(f"{asset.name} {final_version} already exists, skipping") continue - try: model_config = asset.extra_config_as_object() if not prepare_model(model_config, asset.spec_with_path, Path(work_dir)): raise Exception(f"Could not prepare model at {asset.spec_with_path}") except Exception as e: - logger.log_error(f"Model prepare exception. Error => {e}") + logger.log_error(f"Model prepare exception: {e}") failure_list.append(asset) continue - # publish asset - publish_asset( + # Create asset + create_asset( asset=asset, version=final_version, registry_name=registry_name, @@ -441,18 +433,18 @@ if __name__ == "__main__": failed_assets[asset.type.value].append(asset.name) for asset_type, asset_names in failed_assets.items(): - logger.log_warning(f"Failed to register {asset_type}s: {asset_names}") + logger.log_warning(f"Failed to create {asset_type}s: {asset_names}") # the following dump process will generate a yaml file for the report # process in the end of the publishing script with open(failed_list_file, "w") as file: yaml.dump(failed_assets, file, default_flow_style=False, sort_keys=False) if tests_dir: - logger.print("locating test files") + logger.print("Locating test files") test_jobs = find_test_files(tests_dir) - logger.print("preprocessing test files") + logger.print("Preprocessing test files") preprocess_test_files(test_jobs, asset_ids) - logger.print("finished preprocessing test files") + logger.print("Finished preprocessing test files") else: logger.log_warning("Test files not found")