This commit is contained in:
Anton Schwaighofer 2024-05-31 07:37:26 -07:00
Родитель aebaf3e91e
Коммит 4070f31434
2 изменённых файлов: 68 добавлений и 72 удалений

Просмотреть файл

@ -7,10 +7,10 @@ import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from azure.ai.ml import MLClient
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import Data
from azure.ai.ml.entities import Datastore as V2Datastore
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.operations import DatastoreOperations
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azureml.core import Dataset, Workspace, Datastore
@ -476,6 +476,71 @@ class DatasetConfig:
logger.info(status)
return result
def to_data_asset(self, ml_client: MLClient) -> Data:
"""Retrieves or creates a v2 Data Asset using the provided MLClient object, using the settings in the present object.
:param ml_client: An MLClient object.
:return: The data asset associated with the dataset described in the present object.
"""
version = self.version
logger.info(
f"Trying to access data asset {self.name} version {version}, datastore {self.datastore}"
)
# if version is None, this function gets the latest version
data_asset: Data = _get_or_create_v2_data_asset(
ml_client,
self.datastore,
self.name,
version=str(version) if version else None,
)
if not data_asset.path:
raise ValueError(f"Data asset {data_asset.id} has no path.")
return data_asset
def create_v2_inputs(ml_client: MLClient, input_datasets: List[DatasetConfig]) -> Dict[str, Input]:
"""
Create a dictionary of Azure ML v2 Input objects, required for passing input data in to an AML job
:param ml_client: An MLClient object.
:param input_datasets: A list of DatasetConfigs to convert to Inputs.
:return: A dictionary in the format "input_name": Input.
"""
result = {}
for i, dataset in enumerate(input_datasets):
data_asset = dataset.to_data_asset(ml_client)
result[f"{V2_INPUT_ASSET_IDENTIFIER}{i}"] = Input(
type=data_asset.type,
path=data_asset.path,
mode=InputOutputModes.MOUNT if dataset.use_mounting else InputOutputModes.DOWNLOAD,
path
)
return result
def create_v2_outputs(ml_client: MLClient, output_datasets: List[DatasetConfig]) -> Dict[str, Output]:
"""
Create a dictionary of Azure ML v2 Output objects, required for passing output data in to an AML job
:ml_client: An MLClient object.
:param output_datasets: A list of DatasetConfigs to convert to Outputs.
:return: A dictionary in the format "output_name": Output.
"""
output_assets = [get_data_asset_from_config(ml_client, output_dataset) for output_dataset in output_datasets]
return {
# Data assets can be of type "uri_folder", "uri_file", "mltable", all of which are value types in Input
f"{V2_OUTPUT_ASSET_IDENTIFIER}{i}": Output( # type: ignore
type=data_asset.type, # type: ignore
path=data_asset.path,
mode=InputOutputModes.MOUNT, # hard-coded to mount for now, as this is the only mode that doesn't break
)
for i, data_asset in enumerate(output_assets)
}
StrOrDatasetConfig = Union[str, DatasetConfig]

Просмотреть файл

@ -20,8 +20,7 @@ from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
from azure.ai.ml import Input, MLClient, Output, command
from azure.ai.ml.constants import InputOutputModes
from azure.ai.ml.entities import Command, Data
from azure.ai.ml.entities import Command
from azure.ai.ml.entities import Environment as EnvironmentV2
from azure.ai.ml.entities import Job, Sweep, UserIdentityConfiguration
from azure.ai.ml.entities._job.distribution import DistributionConfiguration, MpiDistribution, PyTorchDistribution
@ -641,74 +640,6 @@ def _str_to_path(s: Optional[PathOrString]) -> Optional[Path]:
return s
def get_data_asset_from_config(ml_client: MLClient, dataset_config: DatasetConfig) -> Data:
"""Given a list of dataset configs, generates and returns a list of data assets.
:param ml_client: An MLClient object.
:param dataset_list: The list of datasets to create data assets for.
:raises ValueError: Raised if a data asset has no path.
:return: A list of data assets.
"""
version = dataset_config.version
logger.info(
f"Trying to access data asset {dataset_config.name} version {version}, datastore {dataset_config.datastore}"
)
# if version is None, this function gets the latest version
data_asset: Data = _get_or_create_v2_data_asset(
ml_client,
dataset_config.datastore,
dataset_config.name,
version=str(version) if version else None,
)
if not data_asset.path:
raise ValueError(f"Data asset {data_asset.id} has no path.")
return data_asset
def create_v2_inputs(ml_client: MLClient, input_datasets: List[DatasetConfig]) -> Dict[str, Input]:
"""
Create a dictionary of Azure ML v2 Input objects, required for passing input data in to an AML job
:param ml_client: An MLClient object.
:param input_datasets: A list of DatasetConfigs to convert to Inputs.
:return: A dictionary in the format "input_name": Input.
"""
input_assets = [get_data_asset_from_config(ml_client, input_dataset) for input_dataset in input_datasets]
# Data assets can be of type "uri_folder", "uri_file", "mltable", all of which are value types in Input
return {
f"{V2_INPUT_ASSET_IDENTIFIER}{i}": Input( # type: ignore
type=data_asset.type, # type: ignore
path=data_asset.path,
mode=InputOutputModes.MOUNT if input_datasets[i].use_mounting else InputOutputModes.DOWNLOAD,
)
for i, data_asset in enumerate(input_assets)
}
def create_v2_outputs(ml_client: MLClient, output_datasets: List[DatasetConfig]) -> Dict[str, Output]:
"""
Create a dictionary of Azure ML v2 Output objects, required for passing output data in to an AML job
:ml_client: An MLClient object.
:param output_datasets: A list of DatasetConfigs to convert to Outputs.
:return: A dictionary in the format "output_name": Output.
"""
output_assets = [get_data_asset_from_config(ml_client, output_dataset) for output_dataset in output_datasets]
return {
# Data assets can be of type "uri_folder", "uri_file", "mltable", all of which are value types in Input
f"{V2_OUTPUT_ASSET_IDENTIFIER}{i}": Output( # type: ignore
type=data_asset.type, # type: ignore
path=data_asset.path,
mode=InputOutputModes.MOUNT, # hard-coded to mount for now, as this is the only mode that doesn't break
)
for i, data_asset in enumerate(output_assets)
}
def submit_to_azure_if_needed( # type: ignore
compute_cluster_name: str = "",
entry_script: Optional[PathOrString] = None,