This commit is contained in:
Louie Larson 2023-03-22 13:33:41 -04:00 коммит произвёл GitHub
Родитель e466fbe38f
Коммит 9d69ea7398
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 63 добавлений и 71 удалений

Просмотреть файл

@ -19,7 +19,7 @@ import azureml.assets as assets
from azureml.assets.model.mlflow_utils import MLFlowModelUtils
import azureml.assets.util as util
import yaml
from azureml.assets.config import PathType
from azureml.assets.config import AssetConfig, PathType
from azureml.assets.model import ModelDownloadUtils
from azureml.assets.util import logger
from azure.ai.ml import load_component, load_model
@ -29,7 +29,7 @@ from azure.ai.ml.entities import Component, Environment, Model
ASSET_ID_TEMPLATE = Template("azureml://registries/$registry_name/$asset_type/$asset_name/versions/$version")
TEST_YML = "tests.yml"
PROD_SYSTEM_REGISTRY = "azureml"
PUBLISH_ORDER = [assets.AssetType.ENVIRONMENT, assets.AssetType.COMPONENT, assets.AssetType.MODEL]
CREATE_ORDER = [assets.AssetType.ENVIRONMENT, assets.AssetType.COMPONENT, assets.AssetType.MODEL]
WORKSPACE_ASSET_PATTERN = re.compile(r"^(?:azureml:)?(.+)(?::(.+)|@(.+))$")
REGISTRY_ENV_PATTERN = re.compile(r"^azureml://registries/(.+)/environments/(.+)/(?:versions/(.+)|labels/(.+))")
BEARER = r"Bearer.*"
@ -40,7 +40,7 @@ def find_test_files(dir: Path):
test_jobs = []
for test in dir.iterdir():
logger.print("processing test folder: " + test.name)
logger.print(f"Processing test folder {test.name}")
with open(test / TEST_YML) as fp:
data = yaml.load(fp, Loader=yaml.FullLoader)
for test_group in data.values():
@ -53,16 +53,15 @@ def find_test_files(dir: Path):
def preprocess_test_files(test_jobs, asset_ids: dict):
"""Preprocess test files to generate asset ids."""
for test_job in test_jobs:
logger.print(f"processing test job: {test_job}")
logger.print(f"Processing test job {test_job}")
with open(test_job) as fp:
data = yaml.load(fp, Loader=yaml.FullLoader)
for job_name, job in data['jobs'].items():
asset_name = job['component']
logger.print(f"processing asset {asset_name}")
logger.print(f"Processing asset {asset_name}")
if asset_name in asset_ids:
job['component'] = asset_ids.get(asset_name)
logger.print(
f"for job {job_name}, the new asset id: {job['component']}")
logger.print(f"For job {job_name}, the new asset id is {job['component']}")
with open(test_job, "w") as file:
yaml.dump(data, file, default_flow_style=False,
sort_keys=False)
@ -90,12 +89,12 @@ def update_spec(asset: Union[Component, Environment, Model], spec_path: Path) ->
util.dump_yaml(asset_dict, spec_path)
return True
except Exception as e:
logger.log_error(f"Failed to update spec => {e}")
logger.log_error(f"Failed to update spec: {e}")
return False
def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_dir: Path) -> bool:
"""Prepare Model.
"""Prepare model.
:param model_config: Model Config object
:type model_config: assets.ModelConfig
@ -103,7 +102,7 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_
:type spec_file_path: Path
:param model_dir: path of directory where model is present locally or can be downloaded to.
:type model_dir: Path
:return: If model can be published to registry.
:return: Model successfully prepared for creation in registry.
:rtype: bool
"""
try:
@ -111,7 +110,7 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_
# TODO: temp fix before restructuring what attributes are required in model config and spec.
model.type = model_config.type.value
except Exception as e:
logger.error(f"Error in loading model spec file at {spec_file_path} => {e}")
logger.error(f"Error in loading model spec file at {spec_file_path}: {e}")
return False
if model_config.path.type == PathType.LOCAL:
@ -119,14 +118,13 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_
return update_spec(model, spec_file_path)
if model_config.type == assets.ModelType.CUSTOM:
can_publish_model = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir)
if can_publish_model:
success = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir)
if success:
model.path = model_dir
can_publish_model = update_spec(model, spec_file_path)
success = update_spec(model, spec_file_path)
elif model_config.type == assets.ModelType.MLFLOW:
can_publish_model = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir)
if can_publish_model:
success = ModelDownloadUtils.download_model(model_config.path.type, model_config.path.uri, model_dir)
if success:
model.path = model_dir / MLFlowModelUtils.MLFLOW_MODEL_PATH
if not model_config.flavors:
# try fetching flavors from MLModel file
@ -135,15 +133,13 @@ def prepare_model(model_config: assets.ModelConfig, spec_file_path: Path, model_
mlmodel = util.load_yaml(file_path=mlmodel_file_path)
model.flavors = mlmodel.get("flavors")
except Exception as e:
logger.log_error(f"Error loading flavors from MLmodel file at: {mlmodel_file_path} => {e}")
can_publish_model = update_spec(model, spec_file_path)
logger.log_error(f"Error loading flavors from MLmodel file at {mlmodel_file_path}: {e}")
success = update_spec(model, spec_file_path)
else:
logger.print(model_config.type.value, assets.ModelType.MLFLOW)
can_publish_model = False
logger.log_error(f"Model type {model_config.type} not supported")
logger.log_error(f"Model type {model_config.type.value} not supported")
success = False
return can_publish_model
return success
def validate_update_command_component(
@ -158,9 +154,9 @@ def validate_update_command_component(
:type component: Component
:param spec_path: Path of loaded component
:type spec_path: Path
:param final_version: Final version string used to register component
:param final_version: Final version string used to create component
:type final_version: str
:param registry_name: name of the registry to publish component to
:param registry_name: name of the registry to create component in
:type registry_name: str
:return: True for successful validation and update
:rtype: bool
@ -174,7 +170,7 @@ def validate_update_command_component(
elif (match := WORKSPACE_ASSET_PATTERN.match(env)) is not None:
env_name, env_version, env_label = match.group(1), match.group(2), match.group(3)
else:
logger.print(f"Env ID doesn't match workspace or registry pattern in {asset.spec_with_path}")
logger.log_error(f"Env ID doesn't match workspace or registry pattern in {asset.spec_with_path}")
return False
logger.print(
@ -182,8 +178,8 @@ def validate_update_command_component(
)
if env_registry_name and env_registry_name not in [PROD_SYSTEM_REGISTRY, registry_name]:
logger.log_warning(
"Unexpected !!! Registry name for component's env URI must be either "
logger.log_error(
"Registry name for component's env URI must be either "
+ f"'{registry_name}' or '{PROD_SYSTEM_REGISTRY}'. Got '{env_registry_name}'"
)
return False
@ -193,26 +189,26 @@ def validate_update_command_component(
if env_label:
# TODO: Add fetching env from label
# https://github.com/Azure/azureml-assets/issues/415
logger.print("Unexpected !!! Registering a component with env label is not supported.")
logger.log_error("Creating a component with env label is not supported")
return False
env = None
# Check if component's env is registered
# Check if component's env exists
for version in [env_version, final_version]:
if (env := get_registered_asset_details(
if (env := get_asset_details(
assets.AssetType.ENVIRONMENT.value, env_name, version, registry_name
)) is not None:
break
if not env:
logger.print(f"Could not find a registered env for {component.name}. Please retry again!!!")
logger.log_error(f"Could not find the env for {component.name}")
return False
env_id = env["id"]
logger.print(f"Updating component env to {env_id}")
component.environment = env_id
if not update_spec(component, spec_path):
logger.print(f"Component update failed for asset spec path: {asset.spec_path}")
logger.log_error(f"Component update failed for asset spec path: {asset.spec_path}")
return False
return True
@ -223,7 +219,7 @@ def run_command(cmd: List[str]):
return result
def asset_publish_command(
def asset_create_command(
asset_type: str,
asset_path: str,
registry_name: str,
@ -248,7 +244,7 @@ def asset_publish_command(
return cmd
def publish_asset(
def create_asset(
asset,
registry_name,
resource_group,
@ -257,8 +253,8 @@ def publish_asset(
failure_list,
debug_mode: bool = None
):
"""Publish asset to registry."""
cmd = asset_publish_command(
"""Create asset in registry."""
cmd = asset_create_command(
asset.type.value, str(asset.spec_with_path),
registry_name, version, resource_group, workspace_name, debug_mode
)
@ -271,22 +267,22 @@ def publish_asset(
redacted_output = sanitize_output(result.stdout)
redacted_err = sanitize_output(result.stderr)
if redacted_output:
logger.print(f"stdout:\n{redacted_output}")
logger.print(f"STDOUT: {redacted_output}")
if redacted_err:
logger.print(f"stderr:\n{redacted_err}")
logger.print(f"STDERR: {redacted_err}")
if result.returncode != 0:
logger.log_warning(f"Error creating {asset.type.value} : {asset.name}. Error {redacted_err}")
logger.log_error(f"Error creating {asset.type.value} {asset.name}: {redacted_err}")
failure_list.append(asset)
def get_registered_asset_details(
def get_asset_details(
asset_type: str,
asset_name: str,
asset_version: str,
registry_name: str,
) -> Dict:
"""Return registered asset details."""
"""Get asset details."""
cmd = [
"az", "ml", asset_type, "show",
"--name", asset_name,
@ -295,7 +291,7 @@ def get_registered_asset_details(
]
result = run_command(cmd)
if result.returncode != 0:
logger.log_warning(f"Error in fetching asset details. Error:\n{result.stderr}")
logger.log_error(f"Failed to get asset details: {result.stderr}")
return None
return json.loads(result.stdout)
@ -353,43 +349,41 @@ if __name__ == "__main__":
failed_list_file = args.failed_list
debug_mode = args.debug
asset_ids = {}
logger.print("publishing assets")
# Load publishing list from deploy config
if publish_list_file:
with open(publish_list_file) as fp:
config = yaml.load(fp, Loader=yaml.FullLoader)
publish_list = config.get('create', {})
create_list = config.get('create', {})
else:
publish_list = {}
create_list = {}
# Check publishing list
if not publish_list:
# Check create list
if not create_list:
logger.log_warning("The create list is empty.")
exit(0)
logger.print(f"create list: {publish_list}")
logger.print(f"create list: {create_list}")
failure_list = []
all_assets = util.find_assets(input_dirs=assets_dir)
assets_by_type = defaultdict(list)
assets_by_type: Dict[str, List[AssetConfig]] = defaultdict(list)
for asset in all_assets:
assets_by_type[asset.type.value].append(asset)
for publish_asset_type in PUBLISH_ORDER:
logger.print(f"now publishing {publish_asset_type.value}s.")
if publish_asset_type.value not in publish_list:
for create_asset_type in CREATE_ORDER:
logger.print(f"Creating {create_asset_type.value}s.")
if create_asset_type.value not in create_list:
continue
for asset in assets_by_type.get(publish_asset_type.value, []):
for asset in assets_by_type.get(create_asset_type.value, []):
with TemporaryDirectory() as work_dir:
asset_names = publish_list.get(asset.type.value, [])
asset_names = create_list.get(asset.type.value, [])
if not ("*" in asset_names or asset.name in asset_names):
logger.print(
f"Skipping registering asset {asset.name} because it is not in the publish list")
f"Skipping asset {asset.name} because it is not in the create list")
continue
logger.print(f"Registering {asset}")
final_version = asset.version + "-" + \
passed_version if passed_version else asset.version
logger.print(f"final version: {final_version}")
logger.print(f"Creating {asset.name} {final_version}")
asset_ids[asset.name] = ASSET_ID_TEMPLATE.substitute(
registry_name=registry_name,
asset_type=f"{asset.type.value}s",
@ -400,7 +394,6 @@ if __name__ == "__main__":
# Handle specific asset types
if asset.type == assets.AssetType.COMPONENT:
# load component and check if environment exists
logger.print(f"spec's path: {asset.spec_with_path}")
component = load_component(asset.spec_with_path)
if component.type == "command":
if not validate_update_command_component(
@ -409,23 +402,22 @@ if __name__ == "__main__":
failure_list.append(asset)
continue
elif asset.type == assets.AssetType.MODEL:
# check if model is already registered
# Check if model already exists
final_version = asset.version
if get_registered_asset_details(asset.type.value, asset.name, final_version, registry_name):
logger.print(f"Version already registered. Skipping publish for asset: {asset.name}")
if get_asset_details(asset.type.value, asset.name, final_version, registry_name):
logger.print(f"{asset.name} {final_version} already exists, skipping")
continue
try:
model_config = asset.extra_config_as_object()
if not prepare_model(model_config, asset.spec_with_path, Path(work_dir)):
raise Exception(f"Could not prepare model at {asset.spec_with_path}")
except Exception as e:
logger.log_error(f"Model prepare exception. Error => {e}")
logger.log_error(f"Model prepare exception: {e}")
failure_list.append(asset)
continue
# publish asset
publish_asset(
# Create asset
create_asset(
asset=asset,
version=final_version,
registry_name=registry_name,
@ -441,18 +433,18 @@ if __name__ == "__main__":
failed_assets[asset.type.value].append(asset.name)
for asset_type, asset_names in failed_assets.items():
logger.log_warning(f"Failed to register {asset_type}s: {asset_names}")
logger.log_warning(f"Failed to create {asset_type}s: {asset_names}")
# the following dump process will generate a yaml file for the report
# process in the end of the publishing script
with open(failed_list_file, "w") as file:
yaml.dump(failed_assets, file, default_flow_style=False, sort_keys=False)
if tests_dir:
logger.print("locating test files")
logger.print("Locating test files")
test_jobs = find_test_files(tests_dir)
logger.print("preprocessing test files")
logger.print("Preprocessing test files")
preprocess_test_files(test_jobs, asset_ids)
logger.print("finished preprocessing test files")
logger.print("Finished preprocessing test files")
else:
logger.log_warning("Test files not found")