Add function to download Run files without timeout (#113)

* Add function for download_runs (WIP)

* Add function to find file names for a Run

* download runs with single run id

* Stop output files being created during tests

* Add test with local Run

* flake8

* use DEFAULT_WORKSPACE

* move get_workspace into azure_utils to call from get_aml_run

* download_file only for first node if distributed

* Tidy up functions to get AML Run(s)

* dont cover nested functions

* Update himl-tb and himl-download with recent util changes

* Address PR comments

* Replace AML test for download_file
This commit is contained in:
mebristo 2021-09-30 11:01:18 +01:00 коммит произвёл GitHub
Родитель 5ed95e186c
Коммит f692a79e24
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 777 добавлений и 237 удалений

Просмотреть файл

@ -18,6 +18,7 @@ created.
### Changed
- ([#112](https://github.com/microsoft/hi-ml/pull/112)) Update himl_tensorboard to work with files not in 'logs' directory
- ([#106](https://github.com/microsoft/hi-ml/pull/106)) Split into two packages. Most of existing package renamed to hi-ml-azure, remained remains hi-ml.
- ([#113](https://github.com/microsoft/hi-ml/pull/113)) Add helper function to download files from AML Run, tidied up some command line args, and moved some functions from himl.py to azure_util.py
### Fixed
- ([#117](https://github.com/microsoft/hi-ml/pull/117)) Bug fix: Config.json file was expected to be present, even if workspace was provided explicitly.

Просмотреть файл

@ -42,8 +42,7 @@ def test_suite_setup() -> Generator:
@pytest.fixture
def random_folder() -> Generator:
"""
Fixture to automatically create a random directory before executing a test and then
removing this directory after the test has been executed.
Fixture to automatically create a random directory before executing a test
"""
# create dirs before executing the test
folder = outputs_for_tests() / str(uuid.uuid4().hex)

Просмотреть файл

@ -48,6 +48,76 @@ ENV_NODE_RANK = "NODE_RANK"
ENV_GLOBAL_RANK = "GLOBAL_RANK"
ENV_LOCAL_RANK = "LOCAL_RANK"
RUN_CONTEXT = Run.get_context()
WORKSPACE_CONFIG_JSON = "config.json"
def _find_file(file_name: str, stop_at_pythonpath: bool = True) -> Optional[Path]:
"""
Recurse up the file system, starting at the current working directory, to find a file. Optionally stop when we hit
the PYTHONPATH root (defaults to stopping).
:param file_name: The fine name of the file to find.
:param stop_at_pythonpath: (Defaults to True.) Whether to stop at the PYTHONPATH root.
:return: The path to the file, or None if it cannot be found.
"""
def return_file_or_parent(
start_at: Path,
file_name: str,
stop_at_pythonpath: bool,
pythonpaths: List[Path]) -> Optional[Path]:
for child in start_at.iterdir():
if child.is_file() and child.name == file_name:
return child
if start_at.parent == start_at or start_at in pythonpaths:
return None
return return_file_or_parent(start_at.parent, file_name, stop_at_pythonpath, pythonpaths)
pythonpaths: List[Path] = []
if 'PYTHONPATH' in os.environ:
pythonpaths = [Path(path_string) for path_string in os.environ['PYTHONPATH'].split(os.pathsep)]
return return_file_or_parent(
start_at=Path.cwd(),
file_name=file_name,
stop_at_pythonpath=stop_at_pythonpath,
pythonpaths=pythonpaths)
def get_workspace(aml_workspace: Optional[Workspace], workspace_config_path: Optional[Path]) -> Workspace:
"""
Retrieve an Azure ML Workspace from one of several places:
1. If the function has been called during an AML run (i.e. on an Azure agent), returns the associated workspace
2. If a Workspace object has been provided by the user, return that
3. If a path to a Workspace config file has been provided, load the workspace according to that.
If not running inside AML and neither a workspace nor the config file are provided, the code will try to locate a
config.json file in any of the parent folders of the current working directory. If that succeeds, that config.json
file will be used to create the workspace.
:param aml_workspace: If provided this is returned as the AzureML Workspace.
:param workspace_config_path: If not provided with an AzureML Workspace, then load one given the information in this
config
:return: An AzureML workspace.
"""
if is_running_on_azure_agent():
return RUN_CONTEXT.experiment.workspace
if aml_workspace:
return aml_workspace
if workspace_config_path is None:
workspace_config_path = _find_file(WORKSPACE_CONFIG_JSON)
if workspace_config_path:
logging.info(f"Using the workspace config file {str(workspace_config_path.absolute())}")
else:
raise ValueError("No workspace config file given, nor can we find one.")
if workspace_config_path.is_file():
auth = get_authentication()
return Workspace.from_config(path=str(workspace_config_path), auth=auth)
raise ValueError("Workspace config file does not exist or cannot be read.")
def create_run_recovery_id(run: Run) -> str:
"""
@ -405,7 +475,7 @@ def get_most_recent_run_id(run_recovery_file: Path) -> str:
assert run_recovery_file.is_file(), "When running in cloud builds, this should pick up the ID of a previous \
training run"
run_id = run_recovery_file.read_text().strip()
print(f"Read this run ID from file: {run_id}")
logging.info(f"Read this run ID from file: {run_id}.")
return run_id
@ -416,15 +486,21 @@ def get_most_recent_run(run_recovery_file: Path, workspace: Workspace) -> Run:
:param workspace: Azure ML Workspace
:return: The Run
"""
run_recovery_id = get_most_recent_run_id(run_recovery_file)
return fetch_run(workspace=workspace, run_recovery_id=run_recovery_id)
run_or_recovery_id = get_most_recent_run_id(run_recovery_file)
# Check if the id loaded is of run_recovery_id format
if len(run_or_recovery_id.split(":")) > 1:
return fetch_run(workspace, run_or_recovery_id)
# Otherwise treat it as a run_id
return get_aml_run_from_run_id(run_or_recovery_id, aml_workspace=workspace)
class AzureRunIdSource(Enum):
LATEST_RUN_FILE = 1
EXPERIMENT_LATEST = 2
RUN_ID = 3
RUN_RECOVERY_ID = 4
RUN_IDS = 4
RUN_RECOVERY_ID = 5
RUN_RECOVERY_IDS = 6
def determine_run_id_source(args: Namespace) -> AzureRunIdSource:
@ -440,32 +516,39 @@ def determine_run_id_source(args: Namespace) -> AzureRunIdSource:
return AzureRunIdSource.LATEST_RUN_FILE
if "experiment" in args and args.experiment is not None:
return AzureRunIdSource.EXPERIMENT_LATEST
if "run_recovery_ids" in args and args.run_recovery_ids is not None:
if "run_recovery_ids" in args and args.run_recovery_ids is not None and len(args.run_recovery_ids) > 0:
return AzureRunIdSource.RUN_RECOVERY_IDS
if "run_recovery_id" in args and args.run_recovery_id is not None:
return AzureRunIdSource.RUN_RECOVERY_ID
if "run_ids" in args and args.run_ids is not None:
if "run_id" in args and args.run_id is not None:
return AzureRunIdSource.RUN_ID
raise ValueError("One of latest_run_file, experiment, run_recovery_ids or run_ids must be provided")
if "run_ids" in args and args.run_ids is not None and len(args.run_ids) > 0:
return AzureRunIdSource.RUN_IDS
raise ValueError("One of latest_run_file, experiment, run_recovery_id(s) or run_id(s) must be provided")
def get_aml_runs_from_latest_run_file(args: Namespace, workspace: Workspace) -> List[Run]:
def get_aml_run_from_latest_run_file(args: Namespace, workspace: Workspace) -> Run:
"""
Returns the most recent run that was submitted to AzureML. The function presently always returns a list of
length 1.
Returns the Run object corresponding to the id found in the most recent run file.
:param args: command line args including latest_run_file
:param workspace: An Azure ML Workspace object
:return the Run object corresponding to the id found in the most recent run file.
"""
latest_run_path = Path(args.latest_run_file)
return [get_most_recent_run(latest_run_path, workspace)]
return get_most_recent_run(latest_run_path, workspace)
def get_latest_aml_runs_from_experiment(args: Namespace, workspace: Workspace) -> List[Run]:
"""
Get latest n runs from an AML experiment
Get latest 'num_runs' runs from an AML experiment
:param args: command line args including experiment name and number of runs to return
:param workspace: AML Workspace
:raises ValueError: If Experiment experiment_name doen't exist within Worksacpe
:raises ValueError: If Experiment experiment doen't exist within Workspace
:return: List of AML Runs
"""
experiment_name = args.experiment_name
experiment_name = args.experiment
tags = args.tags or None
num_runs = args.num_runs if 'num_runs' in args else 1
@ -476,27 +559,119 @@ def get_latest_aml_runs_from_experiment(args: Namespace, workspace: Workspace) -
return list(islice(experiment.get_runs(tags=tags), num_runs))
def get_aml_runs_from_recovery_ids(args: Namespace, workspace: Workspace) -> List[Run]:
def get_aml_runs_from_recovery_ids(args: Namespace, aml_workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> List[Run]:
"""
Retrieve AzureML Runs for each of the run_recovery_ids specified in args.
Retrieve multiple Azure ML Runs for each of the run_recovery_ids specified in args.
:param args: command line args including experiment name and number of runs to return
:param workspace: AML Workspace
:param args: command line arguments
:param aml_workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path containing AML Workspace settings
:return: List of AML Runs
"""
runs = [fetch_run(workspace, run_id) for run_id in args.run_recovery_ids]
def _get_run_recovery_ids_from_args(args: Namespace) -> List[str]: # pragma: no cover
"""
Retrieve a list of run recovery ids from the args as long as more than one is supplied.
:param args: The command line arguments
:return: A list of run_recovery_ids as passed in to the command line
"""
if "run_recovery_ids" not in args or len(args.run_recovery_ids) == 0:
raise ValueError("Expected to find run_recovery_ids in args but did not")
else:
return args.run_recovery_ids
workspace = get_workspace(aml_workspace=aml_workspace, workspace_config_path=workspace_config_path)
run_recovery_ids = _get_run_recovery_ids_from_args(args)
runs = [fetch_run(workspace, run_id) for run_id in run_recovery_ids]
return [r for r in runs if r is not None]
def get_aml_runs_from_runids(args: Namespace, workspace: Workspace) -> List[Run]:
def get_aml_run_from_recovery_id(args: Namespace, aml_workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> Run:
"""
Retrieve AzureML Runs for each of the Run Ids specified in args.
Retrieve a single Azure ML Run for the run_recovery_id specified in args.
:param args: command line arguments
:param aml_workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path containing AML Workspace settings
:return: A single AML Run
"""
if "run_recovery_id" in args and args.run_recovery_id:
run_recovery_id = args.run_recovery_id
else:
raise ValueError("No run_recovery_id in args")
workspace = get_workspace(aml_workspace=aml_workspace, workspace_config_path=workspace_config_path)
return fetch_run(workspace, run_recovery_id)
def get_aml_run_from_run_id(run_id: str, aml_workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> Run:
"""
Retrieve an Azure ML Run, firstly by retrieving the corresponding Workspace, and then getting the
run according to the specified run_id. If running in AML, will take the current workspace. Otherwise, if
neither aml_workspace nor workspace_config_path are provided, will try to locate a config.json file
in any of the parent folders of the current working directory.
:param run_id: the parameter corresponding to the 'id' property of the Run
:param aml_workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path to a Workspace config file
:return: The Azure ML Run object with the given run_id
"""
workspace = get_workspace(aml_workspace=aml_workspace, workspace_config_path=workspace_config_path)
run = workspace.get_run(run_id)
return run
def get_aml_run_from_run_id_args(args: Namespace, aml_workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> Run:
"""
Lookup the run_id arg and then retrieve the Azure ML Run object with this id.
:param args: Command line args
:param aml_workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path to a Workspace config file
:return: The Azure ML Run object with the id as specified by args.run_id
"""
if "run_id" in args and args.run_id:
run_id = args.run_id
else:
raise ValueError("No run_id in args")
return get_aml_run_from_run_id(run_id, aml_workspace=aml_workspace, workspace_config_path=workspace_config_path)
def get_aml_runs_from_run_ids(args: Namespace, aml_workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> List[Run]:
"""
Retrieve AzureML Runs for each of the Run Ids specified in args. If running in AML, will take the
current workspace. Otherwise, if neither aml_workspace nor workspace_config_path are provided,
will try to locate a config.json file in any of the parent folders of the current working directory.
:param args: command line args including experiment name and number of runs to return
:param workspace: AML Workspace
:param aml_workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path containing AML Workspace settings
:return: List of AML Runs
"""
runs = [workspace.get_run(r_id) for r_id in args.run_ids]
def _get_run_ids_from_args(args: Namespace) -> List[str]: # pragma: no cover
"""
Retrieve a list of run ids from the args as long as more than one is supplied.
:param args: The command line arguments
:return: A list of run_ids as passed in to the command line
"""
if len(args.run_ids) == 0:
raise ValueError("Expected to find run_ids in args but did not")
else:
return args.run_ids
workspace = get_workspace(aml_workspace=aml_workspace, workspace_config_path=workspace_config_path)
run_ids = _get_run_ids_from_args(args)
runs = [get_aml_run_from_run_id(r_id, aml_workspace=workspace) for r_id in run_ids]
return [r for r in runs if r is not None]
@ -513,13 +688,108 @@ def get_aml_runs(args: Namespace, workspace: Workspace, run_id_source: AzureRunI
:return: List of Azure ML Runs, or an empty list if none are retrieved
"""
if run_id_source == AzureRunIdSource.LATEST_RUN_FILE:
runs = get_aml_runs_from_latest_run_file(args, workspace)
runs = [get_aml_run_from_latest_run_file(args, workspace)]
elif run_id_source == AzureRunIdSource.EXPERIMENT_LATEST:
runs = get_latest_aml_runs_from_experiment(args, workspace)
elif run_id_source == AzureRunIdSource.RUN_RECOVERY_ID:
runs = [get_aml_run_from_recovery_id(args, workspace)]
elif run_id_source == AzureRunIdSource.RUN_RECOVERY_IDS:
runs = get_aml_runs_from_recovery_ids(args, workspace)
elif run_id_source == AzureRunIdSource.RUN_ID:
runs = get_aml_runs_from_runids(args, workspace)
runs = [get_aml_run_from_run_id_args(args, workspace)]
elif run_id_source == AzureRunIdSource.RUN_IDS:
runs = get_aml_runs_from_run_ids(args, workspace)
else:
raise ValueError(f"Unrecognised RunIdSource: {run_id_source}")
return [run for run in runs if run is not None]
def get_run_file_names(run: Run, prefix: str = "") -> List[str]:
"""
Get the remote path to all files for a given Run which optionally start with a given prefix
:param run: The AML Run to look up associated files for
:param prefix: The optional prefix to filter Run files by
:return: A list of paths within the Run's container
"""
all_files = run.get_file_names()
return [f for f in all_files if f.startswith(prefix)] if prefix else all_files
def download_run_files(run: Run, output_dir: Path, prefix: str = "") -> None:
"""
Download all files for a given run, which optionally start with a given prefix
:param run: The AML Run to download associated files for
:param output_dir: Local directory to which the Run files should be downloaded.
:param prefix: Optional prefix to filter Run files by
"""
run_paths = get_run_file_names(run, prefix=prefix)
if len(run_paths) == 0:
raise ValueError("No such files were found for this Run.")
for run_path in run_paths:
output_path = output_dir / run_path
download_run_file(run, run_path, output_path)
def download_run_files_from_run_id(run_id: str, output_dir: Path, prefix: str = "",
workspace: Optional[Workspace] = None,
workspace_config_path: Optional[Path] = None) -> None:
"""
For a given Azure ML run id, first retrieve the Run, and then download all files,
which optionally start with a given prefix
:param run_id: The id of the Azure ML Run
:param output_dir: Local directory to which the Run files should be downloaded.
:param prefix: Optional prefix to filter Run files by
:param workspace: Optional Azure ML Workspace object
:param workspace_config_path: Optional path to settings for Azure ML Workspace
"""
workspace = get_workspace(aml_workspace=workspace, workspace_config_path=workspace_config_path)
run = get_aml_run_from_run_id(run_id, aml_workspace=workspace)
download_run_files(run, output_dir, prefix=prefix)
def download_run_file(run: Run, filename: str, output_path: Path, validate_checksum: bool = False) -> Optional[Path]:
"""
A wrapper around AML Run's download_file method, that handles timeouts
:param run: The AML Run to download associated file for
:param filename: The name of the file as it exists in Azure storage
:param output_path: Local path to which the file should be downloaded
:param validate_checksum: Whether to validate the content from HTTP response
:return: The path to the downloaded file if local rank is zero, else None
"""
if not is_local_rank_zero():
return None
run.download_file(filename, output_file_path=output_path, _validate_checksum=validate_checksum)
return output_path
def is_global_rank_zero() -> bool:
"""
Tries to guess if the current process is running as DDP rank zero, before the training has actually started,
by looking at environment variables.
:return: True if the current process is global rank 0.
"""
# When doing multi-node training, this indicates which node the present job is on. This is set in
# set_environment_variables_for_multi_node
node_rank = os.getenv(ENV_NODE_RANK, "0")
return is_local_rank_zero() and node_rank == "0"
def is_local_rank_zero() -> bool:
"""
Tries to guess if the current process is running as DDP local rank zero (i.e., the process that is responsible for
GPU 0 on each node).
:return: True if the current process is local rank 0.
"""
# The per-node jobs for rank zero do not have any of the rank-related environment variables set. PL will
# set them only once starting its child processes.
global_rank = os.getenv(ENV_GLOBAL_RANK)
local_rank = os.getenv(ENV_LOCAL_RANK)
return global_rank is None and local_rank is None

Просмотреть файл

@ -26,10 +26,10 @@ from azureml.data import OutputFileDatasetConfig
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.train.hyperdrive import HyperDriveConfig
from health.azure.azure_util import (create_python_environment, create_run_recovery_id,
get_authentication, is_run_and_child_runs_completed, register_environment,
from health.azure.azure_util import (create_python_environment, create_run_recovery_id, _find_file,
is_run_and_child_runs_completed, register_environment,
run_duration_string_to_seconds,
to_azure_friendly_string)
to_azure_friendly_string, RUN_CONTEXT, get_workspace)
from health.azure.datasets import (DatasetConfig, StrOrDatasetConfig, _input_dataset_key, _output_dataset_key,
_replace_string_datasets)
@ -41,11 +41,9 @@ AZUREML_COMMANDLINE_FLAG = "--azureml"
CONDA_ENVIRONMENT_FILE = "environment.yml"
LOGS_FOLDER = "logs"
OUTPUT_FOLDER = "outputs"
RUN_CONTEXT = Run.get_context()
RUN_RECOVERY_FILE = "most_recent_run.txt"
SDK_NAME = "innereye"
SDK_VERSION = "2.0"
WORKSPACE_CONFIG_JSON = "config.json"
PathOrString = Union[Path, str]
@ -346,7 +344,6 @@ def submit_to_azure_if_needed( # type: ignore
:param pip_extra_index_url: If provided, use this PIP package index to find additional packages when building
the Docker image.
:param private_pip_wheel_path: If provided, add this wheel as a private package to the AzureML workspace.
:param conda_environment_file: The file that contains the Conda environment definition.
:param default_datastore: The data store in your AzureML workspace, that points to your training data in blob
storage. This is described in more detail in the README.
:param input_datasets: The script will consume all data in folder in blob storage as the input. The folder must
@ -411,9 +408,9 @@ def submit_to_azure_if_needed( # type: ignore
workspace = get_workspace(aml_workspace, workspace_config_path)
conda_environment_file = _str_to_path(conda_environment_file)
if conda_environment_file is None:
conda_environment_file = _find_file(CONDA_ENVIRONMENT_FILE)
conda_environment_file = _str_to_path(conda_environment_file)
logging.info(f"Loaded AzureML workspace {workspace.name}")
run_config = create_run_configuration(
@ -460,38 +457,6 @@ def submit_to_azure_if_needed( # type: ignore
exit(0)
def _find_file(file_name: str, stop_at_pythonpath: bool = True) -> Optional[Path]:
"""
Recurse up the file system, starting at the current working directory, to find a file. Optionally stop when we hit
the PYTHONPATH root (defaults to stopping).
:param file_name: The fine name of the file to find.
:param stop_at_pythonpath: (Defaults to True.) Whether to stop at the PYTHONPATH root.
:return: The path to the file, or None if it cannot be found.
"""
def return_file_or_parent(
start_at: Path,
file_name: str,
stop_at_pythonpath: bool,
pythonpaths: List[Path]) -> Optional[Path]:
for child in start_at.iterdir():
if child.is_file() and child.name == file_name:
return child
if start_at.parent == start_at or start_at in pythonpaths:
return None
return return_file_or_parent(start_at.parent, file_name, stop_at_pythonpath, pythonpaths)
pythonpaths: List[Path] = []
if 'PYTHONPATH' in os.environ:
pythonpaths = [Path(path_string) for path_string in os.environ['PYTHONPATH'].split(os.pathsep)]
return return_file_or_parent(
start_at=Path.cwd(),
file_name=file_name,
stop_at_pythonpath=stop_at_pythonpath,
pythonpaths=pythonpaths)
def _write_run_recovery_file(run: Run) -> None:
"""
Write the run recovery file
@ -540,35 +505,6 @@ def _get_script_params(script_params: Optional[List[str]] = None) -> List[str]:
return [p for p in sys.argv[1:] if p != AZUREML_COMMANDLINE_FLAG]
def get_workspace(aml_workspace: Optional[Workspace], workspace_config_path: Optional[Path]) -> Workspace:
"""
Obtain the AzureML workspace from either the passed in value or the passed in path. If a workspace is provided,
it is returned as-is. If a file to a config.json file is provided, a workspace will be created based on the settings
found in config.json. If neither a workspace nor the config file are provided, the code will try to locate a
config.json file in any of the parent folders of the current working directory. If that succeeds, that config.json
file will be used to create the workspace.
:param aml_workspace: If provided this is returned as the AzureML Workspace.
:param workspace_config_path: If not provided with an AzureML Workspace, then load one given the information in this
config
:return: An AzureML workspace.
"""
if aml_workspace:
return aml_workspace
if workspace_config_path is None:
workspace_config_path = _find_file(WORKSPACE_CONFIG_JSON)
if workspace_config_path:
logging.info(f"Using the workspace config file {str(workspace_config_path.absolute())}")
else:
raise ValueError("No workspace config file given, nor can we find one.")
if workspace_config_path.is_file():
auth = get_authentication()
return Workspace.from_config(path=str(workspace_config_path), auth=auth)
raise ValueError("Workspace config file does not exist or cannot be read.")
def _generate_azure_datasets(
cleaned_input_datasets: List[DatasetConfig],
cleaned_output_datasets: List[DatasetConfig]) -> AzureRunInfo:

Просмотреть файл

@ -6,7 +6,7 @@
from argparse import ArgumentParser, Namespace
from pathlib import Path
from health.azure.azure_util import AzureRunIdSource, get_aml_runs
from health.azure.azure_util import AzureRunIdSource, download_run_files, get_aml_runs
from health.azure.himl import get_workspace
from health.azure.himl_tensorboard import determine_run_id_source
@ -83,29 +83,32 @@ def main() -> None: # pragma: no cover
required=False,
help="Optional run recovery ID of the run to download files from"
)
parser.add_argument(
"--prefix",
type=str,
default="",
required=False,
help="Optional prefix to filter Run files by"
)
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True)
config_path = Path(args.config_file)
if not config_path.is_file():
raise ValueError(
"You must provide a config.json file in the root folder to connect"
"to an AML workspace. This can be downloaded from your AML workspace (see README.md)"
)
workspace = get_workspace(aml_workspace=None, workspace_config_path=config_path)
run_id_source = determine_run_id_source(args)
output_path = determine_output_dir_name(args, run_id_source, output_dir)
prefix = args.prefix
run = get_aml_runs(args, workspace, run_id_source)[0]
# TODO: extend to multiple runs?
try: # pragma: no cover
run.download_files(output_directory=str(output_path))
print(f"Downloading files to {args.output_dir} ")
download_run_files(run, output_dir=output_path, prefix=prefix)
print(f"Downloaded file(s) to '{output_path}'")
except Exception as e: # pragma: no cover
raise ValueError(f"Couldn't download files from run {args.run_id}: {e}")

Просмотреть файл

@ -133,6 +133,14 @@ def main() -> None: # pragma: no cover
required=False,
help="The name of the AML Experiment that you wish to view Runs from"
)
parser.add_argument(
"--num_runs",
type=int,
default=1,
required=False,
help="Specify this in conjunction with --experiment, to specify the number of Runs to plot"
" from a given experiment"
)
parser.add_argument(
"--tags",
action="append",
@ -142,8 +150,8 @@ def main() -> None: # pragma: no cover
)
parser.add_argument(
"--run_recovery_ids",
default=None,
action='append',
default=[],
nargs="+",
required=False,
help="Optional run recovery ids of the runs to plot"
)
@ -151,6 +159,7 @@ def main() -> None: # pragma: no cover
"--run_ids",
default=[],
nargs="+",
required=False,
help="Optional run ids of the runs to plot"
)

Просмотреть файл

@ -10,24 +10,28 @@ import os
import logging
import time
from pathlib import Path
from typing import Optional, List
from typing import List, Optional, Dict
from unittest import mock
from unittest.mock import MagicMock, patch
from uuid import uuid4
import conda_merge
import health.azure.azure_util as util
import pytest
from _pytest.capture import CaptureFixture
from azureml.core import Workspace
from azureml.core import Experiment, ScriptRunConfig, Workspace
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.environment import CondaDependencies
import health.azure.azure_util as util
from health.azure import himl
from health.azure.himl import AML_IGNORE_FILE, append_to_amlignore
from testazure.util import repository_root
from testazure.test_himl import RunTarget, render_and_run_test_script
from testazure.util import repository_root, DEFAULT_WORKSPACE, change_working_directory
RUN_ID = uuid4().hex
RUN_NUMBER = 42
EXPERIMENT_NAME = "fancy-experiment"
AML_TESTS_EXPERIMENT = "test_experiment"
def oh_no() -> None:
@ -37,6 +41,63 @@ def oh_no() -> None:
raise ValueError("Throwing an exception")
@pytest.mark.fast
def test_find_file(tmp_path: Path) -> None:
file_name = "some_file.json"
file = tmp_path / file_name
file.touch()
python_root = tmp_path / "python_root"
python_root.mkdir(exist_ok=False)
start_path = python_root / "starting_directory"
start_path.mkdir(exist_ok=False)
where_are_we_now = Path.cwd()
os.chdir(start_path)
found_file = util._find_file(file_name, False)
assert found_file
with mock.patch.dict(os.environ, {"PYTHONPATH": str(python_root.absolute())}):
found_file = util._find_file(file_name)
assert not found_file
os.chdir(where_are_we_now)
@pytest.mark.fast
@patch("health.azure.azure_util.Workspace.from_config")
@patch("health.azure.azure_util.get_authentication")
@patch("health.azure.azure_util.Workspace")
def test_get_workspace(
mock_workspace: mock.MagicMock,
mock_get_authentication: mock.MagicMock,
mock_from_config: mock.MagicMock,
tmp_path: Path) -> None:
# Test the case when running on AML
with patch("health.azure.azure_util.is_running_on_azure_agent") as mock_is_is_running_on_azure_agent:
mock_is_is_running_on_azure_agent.return_value = True
with patch("health.azure.azure_util.RUN_CONTEXT") as mock_run_context:
mock_run_context.experiment = MagicMock(workspace=mock_workspace)
workspace = util.get_workspace(None, None)
assert workspace == mock_workspace
# Test the case when a workspace object is provided
workspace = util.get_workspace(mock_workspace, None)
assert workspace == mock_workspace
# Test the case when a workspace config path is provided
mock_get_authentication.return_value = "auth"
_ = util.get_workspace(None, Path(__file__))
mock_from_config.assert_called_once_with(path=__file__, auth="auth")
# Work off a temporary directory: No config file is present
with change_working_directory(tmp_path):
with pytest.raises(ValueError) as ex:
util.get_workspace(None, None)
assert "No workspace config file given" in str(ex)
# Workspace config file is set to a file that does not exist
with pytest.raises(ValueError) as ex:
util.get_workspace(None, workspace_config_path=tmp_path / "does_not_exist")
assert "Workspace config file does not exist" in str(ex)
@patch("health.azure.azure_util.Run")
def test_create_run_recovery_id(mock_run: MagicMock) -> None:
"""
@ -171,7 +232,7 @@ def test_split_recovery_id(id: str, expected1: str, expected2: str) -> None:
def test_merge_conda(
random_folder: Path,
caplog: CaptureFixture,
) -> None:
) -> None:
"""
Tests the logic for merging Conda environment files.
"""
@ -382,7 +443,7 @@ def test_register_environment(
mock_workspace: mock.MagicMock,
mock_environment: mock.MagicMock,
caplog: CaptureFixture,
) -> None:
) -> None:
env_name = "an environment"
env_version = "an environment"
mock_environment.get.return_value = mock_environment
@ -399,7 +460,7 @@ def test_register_environment(
def test_set_environment_variables_for_multi_node(
caplog: CaptureFixture,
capsys: CaptureFixture,
) -> None:
) -> None:
with caplog.at_level(logging.INFO): # type: ignore
util.set_environment_variables_for_multi_node()
assert "No settings for the MPI central node found" in caplog.text # type: ignore
@ -435,13 +496,15 @@ class MockRun:
def __init__(self, run_id: str = 'run1234') -> None:
self.id = run_id
def download_file(self) -> None:
# for mypy
pass
def test_determine_run_id_source(tmp_path: Path) -> None:
parser = ArgumentParser()
parser.add_argument("--latest_run_file", type=str)
parser.add_argument("--experiment", type=str)
parser.add_argument("--run_recovery_ids", type=str)
parser.add_argument("--run_ids", type=str)
# If latest run path provided, expect source to be latest run file
mock_latest_run_path = tmp_path / "most_recent_run.txt"
@ -452,15 +515,31 @@ def test_determine_run_id_source(tmp_path: Path) -> None:
mock_args = parser.parse_args(["--experiment", "fake_experiment"])
assert util.determine_run_id_source(mock_args) == util.AzureRunIdSource.EXPERIMENT_LATEST
# If run recovery id is provided, expect source to be that
mock_args = parser.parse_args(["--run_recovery_ids", "experiment:run1234"])
parser = ArgumentParser()
parser.add_argument("--run_recovery_id", type=str)
parser.add_argument("--run_id", type=str)
# If single run recovery id is provided, expect source to be run_recovery_id
mock_args = parser.parse_args(["--run_recovery_id", "experiment:run1234"])
assert util.determine_run_id_source(mock_args) == util.AzureRunIdSource.RUN_RECOVERY_ID
# If run ids provided, expect source to be that
mock_args = parser.parse_args(["--run_ids", "run1234"])
# If run id provided, expect source to be run_id
mock_args = parser.parse_args(["--run_id", "run1234"])
assert util.determine_run_id_source(mock_args) == util.AzureRunIdSource.RUN_ID
# if none are provided, raise ValueError
parser = ArgumentParser()
parser.add_argument("--run_recovery_ids", nargs="+")
parser.add_argument("--run_ids", nargs="+")
# If run recovery ids are provided, expect source to be run_recovery_id
mock_args = parser.parse_args(["--run_recovery_ids", "experiment:run1234", "experiment:5432"])
assert util.determine_run_id_source(mock_args) == util.AzureRunIdSource.RUN_RECOVERY_IDS
# If run ids provided, expect source to be run_id
mock_args = parser.parse_args(["--run_ids", "run1234", "run5432"])
assert util.determine_run_id_source(mock_args) == util.AzureRunIdSource.RUN_IDS
# if none of the expected run source options are provided, assert that Exception is raised
mock_args = parser.parse_args([])
with pytest.raises(Exception):
util.determine_run_id_source(mock_args)
@ -475,18 +554,17 @@ def test_get_aml_runs_from_latest_run_file(tmp_path: Path) -> None:
parser.add_argument("--latest_run_file", type=str)
mock_args = parser.parse_args(["--latest_run_file", str(mock_latest_run_path)])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run") as mock_fetch_run:
with mock.patch("health.azure.azure_util.get_aml_run_from_run_id") as mock_fetch_run:
mock_fetch_run.return_value = MockRun(mock_run_id)
aml_runs = util.get_aml_runs_from_latest_run_file(mock_args, mock_workspace)
mock_fetch_run.assert_called_once_with(workspace=mock_workspace, run_recovery_id=mock_run_id)
assert len(aml_runs) == 1
assert aml_runs[0].id == mock_run_id
aml_run = util.get_aml_run_from_latest_run_file(mock_args, mock_workspace)
mock_fetch_run.assert_called_once_with(mock_run_id, aml_workspace=mock_workspace)
assert aml_run.id == mock_run_id
# if path doesn't exist, expect error
with pytest.raises(Exception):
mock_args = parser.parse_args(["--latest_run_file", "idontexist"])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
util.get_aml_runs_from_latest_run_file(mock_args, mock_workspace)
util.get_aml_run_from_latest_run_file(mock_args, mock_workspace)
# if arg not provided, expect error
with pytest.raises(Exception):
@ -496,17 +574,15 @@ def test_get_aml_runs_from_latest_run_file(tmp_path: Path) -> None:
def test_get_latest_aml_runs_from_experiment() -> None:
def _get_experiment_runs() -> List[MockRun]:
return [MockRun(), MockRun(), MockRun(), MockRun()]
mock_experiment_name = "MockExperiment"
parser = ArgumentParser()
parser.add_argument("--experiment_name", type=str)
parser.add_argument("--experiment", type=str)
parser.add_argument("--tags", action="append", default=[])
parser.add_argument("--num_runs", type=int, default=1)
mock_args = parser.parse_args(["--experiment_name", mock_experiment_name])
mock_args = parser.parse_args(["--experiment", mock_experiment_name])
with mock.patch("health.azure.azure_util.Experiment") as mock_experiment:
with mock.patch("health.azure.azure_util.Workspace",
experiments={mock_experiment_name: mock_experiment}
@ -517,7 +593,7 @@ def test_get_latest_aml_runs_from_experiment() -> None:
assert aml_runs[0].id == "run1234"
# Test that correct number of runs are returned if both experiment_name and num_runs are provided
mock_args = parser.parse_args(["--experiment_name", mock_experiment_name, "--num_runs", "3"])
mock_args = parser.parse_args(["--experiment", mock_experiment_name, "--num_runs", "3"])
with mock.patch("health.azure.azure_util.Experiment") as mock_experiment:
mock_experiment.get_runs.return_value = _get_experiment_runs()
with mock.patch("health.azure.azure_util.Workspace",
@ -527,8 +603,8 @@ def test_get_latest_aml_runs_from_experiment() -> None:
assert len(runs) == 3
assert runs[0].id == "run1234"
# Test that correct number of returns if both experiment_name and tags are provided
mock_args = parser.parse_args(["--experiment_name", mock_experiment_name, "--tags", "3"])
# Test that correct number of returns if both experiment and tags are provided
mock_args = parser.parse_args(["--experiment", mock_experiment_name, "--tags", "3"])
with mock.patch("health.azure.azure_util.Experiment") as mock_experiment:
mock_experiment.get_runs.return_value = _get_experiment_runs()
with mock.patch("health.azure.azure_util.Workspace",
@ -539,7 +615,7 @@ def test_get_latest_aml_runs_from_experiment() -> None:
assert runs[0].id == "run1234"
# Test that value error is raised if experiment name is not in workspace
mock_args = parser.parse_args(["--experiment_name", "idontexist"])
mock_args = parser.parse_args(["--experiment", "idontexist"])
with pytest.raises(Exception):
with mock.patch("health.azure.azure_util.Workspace",
experiments={mock_experiment_name: mock_experiment}
@ -547,48 +623,106 @@ def test_get_latest_aml_runs_from_experiment() -> None:
util.get_latest_aml_runs_from_experiment(mock_args, mock_workspace) # type: ignore
def _mock_get_most_recent_run(path: Path, workspace: Workspace) -> MockRun:
return MockRun()
def test_get_aml_runs_from_recovery_ids() -> None:
def _mock_get_most_recent_run(path: Path, workspace: Workspace) -> MockRun:
return MockRun()
parser = ArgumentParser()
parser.add_argument("--run_recovery_ids", type=str, action="append", default=None)
parser.add_argument("--run_recovery_ids", default=[], nargs="+")
# Test that the correct number of runs are returned if run_recovery_id(s) is(are) provided
mock_args = parser.parse_args(["--run_recovery_id", "expt:run123"])
# Test that the correct number of runs are returned when run_recovery_ids are provided
mock_args = parser.parse_args(["--run_recovery_ids", "expt:run123", "expt:5432"])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run", _mock_get_most_recent_run):
runs = util.get_aml_runs_from_recovery_ids(mock_args, mock_workspace) # type: ignore
assert len(runs) == 1
assert runs[0].id == "run1234"
assert len(runs) == 2
assert runs[0].id == "run1234" # this is the id of the MockRun
# Test that Exception is raised if run_recovery_ids not provided
mock_args = parser.parse_args([])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run", _mock_get_most_recent_run):
with pytest.raises(Exception):
util.get_aml_runs_from_recovery_ids(mock_args, mock_workspace) # type: ignore
def test_get_aml_runs_from_runids() -> None:
def test_get_aml_run_from_recovery_id() -> None:
parser = ArgumentParser()
parser.add_argument("--run_ids", action="append", default=[])
parser.add_argument("--run_recovery_id", type=str, default="")
# assert single run returned if single run id provided
# Test that a single run is returned if run_recovery_id is provided
mock_args = parser.parse_args(["--run_recovery_id", "expt:run123"])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run", _mock_get_most_recent_run):
run = util.get_aml_run_from_recovery_id(mock_args, mock_workspace) # type: ignore
assert run.id == "run1234"
# Test that Exception is raised if run_recovery_id is not provided
mock_args = parser.parse_args([])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run", _mock_get_most_recent_run):
with pytest.raises(Exception):
util.get_aml_run_from_recovery_id(mock_args, mock_workspace) # type: ignore
def test_get_aml_run_from_run_id() -> None:
parser = ArgumentParser()
parser.add_argument("--run_id", type=str, default="")
# assert single run returned
mock_run_id = "run123"
mock_args = parser.parse_args(["--run_ids", mock_run_id])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
mock_workspace.get_run.return_value = MockRun(mock_run_id)
aml_runs = util.get_aml_runs_from_runids(mock_args, mock_workspace)
aml_run = util.get_aml_run_from_run_id(mock_run_id, aml_workspace=mock_workspace)
mock_workspace.get_run.assert_called_with(mock_run_id)
assert len(aml_runs) == 1
assert aml_runs[0].id == mock_run_id
assert aml_run.id == mock_run_id
# assert multiple runs returned if multiple run ids provided
mock_run_id_2 = "run456"
mock_args = parser.parse_args(["--run_ids", mock_run_id, "--run_ids", mock_run_id_2])
def test_get_aml_run_from_run_id_args() -> None:
parser = ArgumentParser()
parser.add_argument("--run_id", type=str, default="")
# assert single run returned (mock the workspace since this run doesnt really exist)
mock_run_id = "run123"
mock_args = parser.parse_args(["--run_id", mock_run_id])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
mock_workspace.get_run.return_value = MockRun(mock_run_id_2)
aml_runs = util.get_aml_runs_from_runids(mock_args, mock_workspace)
mock_workspace.get_run.return_value = MockRun(mock_run_id)
aml_run = util.get_aml_run_from_run_id_args(mock_args, aml_workspace=mock_workspace)
mock_workspace.get_run.assert_called_with(mock_run_id)
assert aml_run.id == mock_run_id
# Test that Exception is raised if run_id is not provided. Not necessary to mock the workspace.
mock_args = parser.parse_args([])
ws = DEFAULT_WORKSPACE.workspace
with pytest.raises(Exception):
util.get_aml_run_from_run_id_args(mock_args, aml_workspace=ws)
def test_get_aml_runs_from_run_ids() -> None:
parser = ArgumentParser()
parser.add_argument("--run_ids", nargs="+", default=[])
# assert correct number of runs is returned
mock_run_id = "run123"
mock_run_id_2 = "run456"
mock_args = parser.parse_args(["--run_ids", mock_run_id, mock_run_id_2])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
mock_workspace.get_run.return_value = MockRun(mock_run_id_2) # both MockRuns will get this id
aml_runs = util.get_aml_runs_from_run_ids(mock_args, mock_workspace)
assert len(aml_runs) == 2
assert aml_runs[1].id == mock_run_id_2
# Test that Exception is raised if run_ids are not provided
mock_args = parser.parse_args([])
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
with mock.patch("health.azure.azure_util.fetch_run", _mock_get_most_recent_run):
with pytest.raises(Exception):
util.get_aml_runs_from_run_ids(mock_args, aml_workspace=mock_workspace)
def test_get_aml_runs(tmp_path: Path) -> None:
def test_get_aml_runs_file(tmp_path: Path) -> None:
parser = ArgumentParser()
mock_latest_run_path = tmp_path / "most_recent_run.txt"
parser.add_argument("--latest_run_file", type=str)
@ -596,43 +730,279 @@ def test_get_aml_runs(tmp_path: Path) -> None:
# if latest run path has been provided:
mock_args = parser.parse_args(["--latest_run_file", str(mock_latest_run_path)])
run_id_source = util.AzureRunIdSource.LATEST_RUN_FILE
with mock.patch("health.azure.azure_util.get_aml_runs_from_latest_run_file") as mock_get_from_run_path:
with mock.patch("health.azure.azure_util.get_aml_run_from_latest_run_file") as mock_get_from_run_path:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
aml_runs = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
_ = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
mock_get_from_run_path.assert_called_once()
def test_get_aml_runs_experiment(tmp_path: Path) -> None:
parser = ArgumentParser()
# if experiment name has been provided:
parser.add_argument("--experiment_name", type=str)
mock_args = parser.parse_args(["--experiment_name", "mockExperiment"])
parser.add_argument("--experiment", type=str)
mock_args = parser.parse_args(["--experiment", "mockExperiment"])
run_id_source = util.AzureRunIdSource.EXPERIMENT_LATEST
with mock.patch("health.azure.azure_util.get_latest_aml_runs_from_experiment") as mock_get_from_experiment:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
aml_runs = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
_ = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
mock_get_from_experiment.assert_called_once()
# if run_recovery_id has been provided:
parser.add_argument("--run_recovery_ids", action="append")
mock_args = parser.parse_args(["--run_recovery_ids", "experiment:run1234"])
run_id_source = util.AzureRunIdSource.RUN_RECOVERY_ID
with mock.patch("health.azure.azure_util.get_aml_runs_from_recovery_ids") as mock_get_from_recovery_ids:
def test_get_aml_runs_recovery_ids(tmp_path: Path) -> None:
parser = ArgumentParser()
# if run_recovery_ids has been provided:
parser.add_argument("--run_recovery_ids", nargs="+", default=[])
mock_args = parser.parse_args(["--run_recovery_ids", "experiment:run1234", "experiment:4321"])
run_id_source = util.AzureRunIdSource.RUN_RECOVERY_IDS
with mock.patch("health.azure.azure_util.get_aml_runs_from_recovery_ids",
return_value=[MockRun(), MockRun()]) as mock_get_from_recovery_ids:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
aml_runs = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
assert len(aml_runs) == 2
mock_get_from_recovery_ids.assert_called_once()
def test_get_aml_runs_recovery_id(tmp_path: Path) -> None:
parser = ArgumentParser()
# if run_recovery_id has been provided:
parser.add_argument("--run_recovery_id", type=str)
mock_args = parser.parse_args(["--run_recovery_id", "experiment:run1234"])
run_id_source = util.AzureRunIdSource.RUN_RECOVERY_ID
with mock.patch("health.azure.azure_util.get_aml_run_from_recovery_id") as mock_get_from_recovery_id:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
_ = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
mock_get_from_recovery_id.assert_called_once()
def test_get_aml_runs_run_ids(tmp_path: Path) -> None:
parser = ArgumentParser()
# if run_ids has been provided:
parser.add_argument("--run_ids", action="append")
mock_args = parser.parse_args(["--run_ids", "run1234"])
run_id_source = util.AzureRunIdSource.RUN_ID
with mock.patch("health.azure.azure_util.get_aml_runs_from_runids",
return_value=[MockRun()]) as mock_get_from_run_id:
parser.add_argument("--run_ids", nargs="+", default=[])
mock_args = parser.parse_args(["--run_ids", "run1234", "run5432"])
run_id_source = util.AzureRunIdSource.RUN_IDS
with mock.patch("health.azure.azure_util.get_aml_runs_from_run_ids",
return_value=[MockRun(), MockRun()]) as mock_get_from_run_ids:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
aml_runs = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
assert len(aml_runs) == 1
assert len(aml_runs) == 2
mock_get_from_run_ids.assert_called_once()
def test_get_aml_runs_run_id(tmp_path: Path) -> None:
parser = ArgumentParser()
# if run_id has been provided:
parser.add_argument("--run_id", type=str)
mock_args = parser.parse_args(["--run_id", "run1234"])
run_id_source = util.AzureRunIdSource.RUN_ID
with mock.patch("health.azure.azure_util.get_aml_run_from_run_id") as mock_get_from_run_id:
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
_ = util.get_aml_runs(mock_args, mock_workspace, run_id_source)
mock_get_from_run_id.assert_called_once()
def test_get_aml_runs_run_unknown_source(tmp_path: Path) -> None:
parser = ArgumentParser()
# otherwise assert Exception raised
mock_args = parser.parse_args([])
run_id_source = None
with pytest.raises(Exception):
with mock.patch("health.azure.azure_util.Workspace") as mock_workspace:
util.get_aml_runs(mock_args, mock_workspace, run_id_source) # type: ignore
def _get_file_names(pref: str = "") -> List[str]:
file_names = ["somepath.txt", "abc/someotherpath.txt", "abc/def/anotherpath.txt"]
if len(pref) > 0:
return [u for u in file_names if u.startswith(pref)]
else:
return file_names
def test_get_run_file_names() -> None:
with patch("azureml.core.Run") as mock_run:
expected_file_names = _get_file_names()
mock_run.get_file_names.return_value = expected_file_names
# check that we get the expected run paths if no filter is applied
run_paths = util.get_run_file_names(mock_run) # type: ignore
assert len(run_paths) == len(expected_file_names)
assert sorted(run_paths) == sorted(expected_file_names)
# Now check we get the expected run paths if a filter is applied
prefix = "abc"
run_paths = util.get_run_file_names(mock_run, prefix=prefix)
assert all([f.startswith(prefix) for f in run_paths])
def _mock_download_file(filename: str, output_file_path: Optional[str] = None,
_validate_checksum: bool = False) -> None:
"""
Creates an empty file at the given output_file_path
"""
output_file_path = 'test_output' if output_file_path is None else output_file_path
Path(output_file_path).touch(exist_ok=True)
@pytest.mark.parametrize("dummy_env_vars", [{}, {util.ENV_LOCAL_RANK: "1"}])
@pytest.mark.parametrize("prefix", ["", "abc"])
def test_download_run_files(tmp_path: Path, dummy_env_vars: Dict[Optional[str], Optional[str]], prefix: str) -> None:
# Assert that 'downloaded' paths don't exist to begin with
dummy_paths = [x[0] for x in _get_file_names(pref=prefix)]
expected_paths = [tmp_path / dummy_path for dummy_path in dummy_paths]
# Ensure that paths don't already exist
[p.unlink() for p in expected_paths if p.exists()] # type: ignore
assert not any([p.exists() for p in expected_paths])
mock_run = MockRun(run_id="id123")
with mock.patch.dict(os.environ, dummy_env_vars):
with patch("health.azure.azure_util.get_run_file_names") as mock_get_run_paths:
mock_get_run_paths.return_value = dummy_paths # type: ignore
mock_run.download_file = MagicMock() # type: ignore
mock_run.download_file.side_effect = _mock_download_file
util.download_run_files(mock_run, output_dir=tmp_path)
# First test the case where is_local_rank_zero returns True
if not any(dummy_env_vars):
# Check that our mocked download_run_file has been called once for each file
assert sum([p.exists() for p in expected_paths]) == len(expected_paths)
# Now test the case where is_local_rank_zero returns False - in this case nothing should be created
else:
assert not any([p.exists() for p in expected_paths])
@patch("health.azure.azure_util.get_workspace")
@patch("health.azure.azure_util.get_aml_run_from_run_id")
@patch("health.azure.azure_util.download_run_files")
def test_download_run_files_from_run_id(mock_download_run_files: MagicMock,
mock_get_aml_run_from_run_id: MagicMock,
mock_workspace: MagicMock) -> None:
mock_run = {"id": "run123"}
mock_get_aml_run_from_run_id.return_value = mock_run
util.download_run_files_from_run_id("run123", Path(__file__))
mock_download_run_files.assert_called_with(mock_run, Path(__file__), prefix="")
@pytest.mark.parametrize("dummy_env_vars, expect_file_downloaded", [({}, True), ({util.ENV_LOCAL_RANK: "1"}, False)])
@patch("azureml.core.Run", MockRun)
def test_download_run_file(tmp_path: Path, dummy_env_vars: Dict[str, str], expect_file_downloaded: bool) -> None:
dummy_filename = "filetodownload.txt"
expected_file_path = tmp_path / dummy_filename
# mock the method 'download_file' on the AML Run class and assert it gets called with the expected params
mock_run = MockRun(run_id="id123")
mock_run.download_file = MagicMock(return_value=None) # type: ignore
mock_run.download_file.side_effect = _mock_download_file
with mock.patch.dict(os.environ, dummy_env_vars):
util.download_run_file(mock_run, dummy_filename, expected_file_path)
if expect_file_downloaded:
mock_run.download_file.assert_called_with(dummy_filename, output_file_path=expected_file_path,
_validate_checksum=False)
assert expected_file_path.exists()
else:
assert not expected_file_path.exists()
def test_download_run_file_remote(tmp_path: Path) -> None:
# This test will create a Run in your workspace (using only local compute)
ws = DEFAULT_WORKSPACE.workspace
experiment = Experiment(ws, AML_TESTS_EXPERIMENT)
config = ScriptRunConfig(
source_directory=".",
command=["cd ."], # command that does nothing
compute_target="local"
)
run = experiment.submit(config)
file_to_upload = tmp_path / "dummy_file.txt"
file_contents = "Hello world"
file_to_upload.write_text(file_contents)
# This should store the file in outputs
run.upload_file("dummy_file", str(file_to_upload))
output_file_path = tmp_path / "downloaded_file.txt"
assert not output_file_path.exists()
start_time = time.perf_counter()
util.download_run_file(run, "dummy_file", output_file_path)
end_time = time.perf_counter()
time_dont_validate_checksum = end_time - start_time
assert output_file_path.exists()
assert output_file_path.read_text() == file_contents
# Now delete the file and try again with _validate_checksum == True
output_file_path.unlink()
assert not output_file_path.exists()
start_time = time.perf_counter()
util.download_run_file(run, "dummy_file", output_file_path, validate_checksum=True)
end_time = time.perf_counter()
time_validate_checksum = end_time - start_time
assert output_file_path.exists()
assert output_file_path.read_text() == file_contents
logging.info(f"Time to download file without checksum: {time_dont_validate_checksum} vs time with"
f"validation {time_validate_checksum}.")
def test_download_run_file_during_run(tmp_path: Path) -> None:
# This test will create a Run in your workspace (using only local compute)
expected_file_path = tmp_path / "azureml-logs"
# Check that at first the path to downloaded logs doesnt exist (will be created by the later test script)
assert not expected_file_path.exists()
ws = DEFAULT_WORKSPACE.workspace
# call the script here
extra_options = {
"imports": """
from azureml.core import Run
from health.azure.azure_util import download_run_files""",
"args": """
parser.add_argument("--output_path", type=str, required=True)
""",
"body": """
output_path = Path(args.output_path)
output_path.mkdir(exist_ok=True)
run_ctx = Run.get_context()
available_files = run_ctx.get_file_names()
first_file_name = available_files[0]
output_file_path = output_path / first_file_name
download_run_files(run_ctx, output_path)
run_ctx.download_file(first_file_name, output_file_path=output_file_path)
print(f"Downloaded file {first_file_name} to location {output_file_path}")
"""
}
extra_args = ["--output_path", 'outputs']
render_and_run_test_script(tmp_path, RunTarget.AZUREML, extra_options, extra_args, True)
run = util.get_most_recent_run(run_recovery_file=tmp_path / himl.RUN_RECOVERY_FILE,
workspace=ws)
assert run.status == "Completed"
def test_is_global_rank_zero() -> None:
with mock.patch.dict(os.environ, {util.ENV_NODE_RANK: "0", util.ENV_GLOBAL_RANK: "0", util.ENV_LOCAL_RANK: "0"}):
assert not util.is_global_rank_zero()
with mock.patch.dict(os.environ, {util.ENV_GLOBAL_RANK: "0", util.ENV_LOCAL_RANK: "0"}):
assert not util.is_global_rank_zero()
with mock.patch.dict(os.environ, {util.ENV_NODE_RANK: "0"}):
assert util.is_global_rank_zero()
def test_is_local_rank_zero() -> None:
# mock the environment variables
with mock.patch.dict(os.environ, {}):
assert util.is_local_rank_zero()
with mock.patch.dict(os.environ, {util.ENV_GLOBAL_RANK: "1", util.ENV_LOCAL_RANK: "1"}):
assert not util.is_local_rank_zero()

Просмотреть файл

@ -76,7 +76,7 @@ def render_test_script(entry_script_path: Path, extra_options: Dict[str, str],
default_options['aml_workspace'] = 'None'
default_options['workspace_config_file'] = workspace_config_file_arg
default_options['snapshot_root_directory'] = 'here'
default_options['conda_environment_file'] = f'Path("{str(environment_yaml_path)}")'
default_options['conda_environment_file'] = f'Path("{str(environment_yaml_path.as_posix())}")'
default_options['environment_variables'] = 'None'
default_options['pip_extra_index_url'] = '""'
default_options['private_pip_wheel_path'] = 'None'
@ -88,6 +88,7 @@ def render_test_script(entry_script_path: Path, extra_options: Dict[str, str],
default_options['wait_for_completion_show_output'] = 'True'
default_options['args'] = ''
default_options['body'] = ''
default_options['imports'] = ''
all_options = dict(default_options, **extra_options)

Просмотреть файл

@ -17,7 +17,9 @@ from argparse import ArgumentParser
from pathlib import Path
from health.azure.datasets import DatasetConfig
from health.azure.himl import submit_to_azure_if_needed, WORKSPACE_CONFIG_JSON
from health.azure import submit_to_azure_if_needed
from health.azure.azure_util import WORKSPACE_CONFIG_JSON
{{ imports }}
try:
hi_ml_dist = pkg_resources.get_distribution("hi-ml-azure")

Просмотреть файл

@ -25,17 +25,17 @@ def test_download_aml_run_args(tmp_path: Path) -> None:
'or run_id must be provided' in str(e)
def test_no_config_path() -> None:
def test_no_config_path(tmp_path: Path) -> None:
# if no config path exists, will fail
with pytest.raises(Exception) as e:
subprocess.Popen(["python", DOWNLOAD_SCRIPT_PATH, "--config_path", "idontexist"])
subprocess.Popen(["python", DOWNLOAD_SCRIPT_PATH, "--config_path", "idontexist", "--output_dir", str(tmp_path)])
assert "You must provide a config.json file in the root folder to connect" in str(e)
def test_download_aml_run_no_runs() -> None:
def test_download_aml_run_no_runs(tmp_path: Path) -> None:
# if no such run exists, will fail
with pytest.raises(Exception) as e:
subprocess.Popen(["python", DOWNLOAD_SCRIPT_PATH, "--run_id", "madeuprun"])
subprocess.Popen(["python", DOWNLOAD_SCRIPT_PATH, "--run_id", "madeuprun", "--output_dir", str(tmp_path)])
assert "was not found" in str(e)

Просмотреть файл

@ -28,7 +28,7 @@ from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.train.hyperdrive import HyperDriveConfig
import health.azure.himl as himl
from health.azure.azure_util import EXPERIMENT_RUN_SEPARATOR, get_most_recent_run
from health.azure.azure_util import EXPERIMENT_RUN_SEPARATOR, get_most_recent_run, WORKSPACE_CONFIG_JSON, get_workspace
from health.azure.datasets import DatasetConfig, _input_dataset_key, _output_dataset_key, get_datastore
from testazure.test_data.make_tests import render_environment_yaml, render_test_script
from testazure.util import DEFAULT_DATASTORE, change_working_directory, check_config_json, repository_root
@ -258,31 +258,6 @@ def test_get_script_params() -> None:
assert expected_params == himl._get_script_params()
@pytest.mark.fast
@patch("health.azure.himl.Workspace.from_config")
@patch("health.azure.himl.get_authentication")
@patch("health.azure.himl.Workspace")
def test_get_workspace(
mock_workspace: mock.MagicMock,
mock_get_authentication: mock.MagicMock,
mock_from_config: mock.MagicMock,
tmp_path: Path) -> None:
workspace = himl.get_workspace(mock_workspace, None)
assert workspace == mock_workspace
mock_get_authentication.return_value = "auth"
_ = himl.get_workspace(None, Path(__file__))
mock_from_config.assert_called_once_with(path=__file__, auth="auth")
# Work off a temporary directory: No config file is present
with change_working_directory(tmp_path):
with pytest.raises(ValueError) as ex:
himl.get_workspace(None, None)
assert "No workspace config file given" in str(ex)
# Workspace config file is set to a file that does not exist
with pytest.raises(ValueError) as ex:
himl.get_workspace(None, workspace_config_path=tmp_path / "does_not_exist")
assert "Workspace config file does not exist" in str(ex)
@pytest.mark.fast
@patch("health.azure.himl.is_running_in_azure")
def test_get_workspace_no_config(
@ -484,25 +459,6 @@ def test_str_to_path(tmp_path: Path) -> None:
assert himl._str_to_path(str(tmp_path)) == tmp_path
@pytest.mark.fast
def test_find_file(tmp_path: Path) -> None:
file_name = "some_file.json"
file = tmp_path / file_name
file.touch()
python_root = tmp_path / "python_root"
python_root.mkdir(exist_ok=False)
start_path = python_root / "starting_directory"
start_path.mkdir(exist_ok=False)
where_are_we_now = Path.cwd()
os.chdir(start_path)
found_file = himl._find_file(file_name, False)
assert found_file
with mock.patch.dict(os.environ, {"PYTHONPATH": str(python_root.absolute())}):
found_file = himl._find_file(file_name)
assert not found_file
os.chdir(where_are_we_now)
# endregion Small fast local unit tests
@ -637,7 +593,7 @@ def render_and_run_test_script(path: Path,
else:
assert EXPECTED_QUEUED in captured
with check_config_json(path):
workspace = himl.get_workspace(aml_workspace=None, workspace_config_path=path / himl.WORKSPACE_CONFIG_JSON)
workspace = get_workspace(aml_workspace=None, workspace_config_path=path / WORKSPACE_CONFIG_JSON)
run = get_most_recent_run(run_recovery_file=path / himl.RUN_RECOVERY_FILE,
workspace=workspace)
@ -685,7 +641,7 @@ def test_invoking_hello_world_config(run_target: RunTarget, use_package: bool, t
Test that invoking hello_world.py elevates itself to AzureML with config.json.
Test against either the local src folder or a package. If running locally, ensure that there
are no whl's in the dist folder, or that will be used.
:param local: Local execution if True, else in AzureML.
:param run_target: Local execution if True, else in AzureML.
:param use_package: True to test against package, False to test against copy of src folder.
:param tmp_path: PyTest test fixture for temporary path.
"""
@ -835,8 +791,8 @@ def test_invoking_hello_world_datasets(run_target: RunTarget, tmp_path: Path) ->
# Get default datastore
with check_config_json(tmp_path):
workspace = himl.get_workspace(aml_workspace=None,
workspace_config_path=tmp_path / himl.WORKSPACE_CONFIG_JSON)
workspace = get_workspace(aml_workspace=None,
workspace_config_path=tmp_path / WORKSPACE_CONFIG_JSON)
datastore: AzureBlobDatastore = get_datastore(workspace=workspace,
datastore_name=DEFAULT_DATASTORE)

Просмотреть файл

@ -30,17 +30,11 @@ def test_run_tensorboard_args() -> None:
" or run_ids must be provided" in str(e)
def test_no_config_path() -> None:
# if no config path exists, will fail
with pytest.raises(Exception) as e:
subprocess.Popen(["python", TENSORBOARD_SCRIPT_PATH, "--config_path", "idontexist"])
assert "You must provide a config.json file in the root folder to connect" in str(e)
def test_run_tensorboard_no_runs() -> None:
def test_run_tensorboard_no_runs(tmp_path: Path) -> None:
# if no such run exists, will fail
with pytest.raises(Exception) as e:
subprocess.Popen(["python", TENSORBOARD_SCRIPT_PATH, "--run_recovery_ids", "madeuprun"])
subprocess.Popen(["python", TENSORBOARD_SCRIPT_PATH, "--run_recovery_ids", "madeuprun",
"--log_dir", str(tmp_path)])
assert "No runs were found" in str(e)

Просмотреть файл

@ -16,8 +16,7 @@ from typing import Generator
from azureml.core import Workspace
from health.azure.azure_util import (ENV_RESOURCE_GROUP, ENV_SUBSCRIPTION_ID, ENV_WORKSPACE_NAME, get_authentication,
get_secret_from_environment)
from health.azure.himl import WORKSPACE_CONFIG_JSON
get_secret_from_environment, WORKSPACE_CONFIG_JSON)
DEFAULT_DATASTORE = "himldatasets"
FALLBACK_SINGLE_RUN = "refs_pull_545_merge:refs_pull_545_merge_1626538212_d2b07afd"