зеркало из https://github.com/microsoft/hi-ml.git
ENH: Enable single-node MPI jobs with SDK v1 (#914)
Extend the `use_mpi_run_for_single_node_jobs` flag to SDK v1 jobs
This commit is contained in:
Родитель
1cf19cce04
Коммит
c606808b20
|
@ -160,6 +160,7 @@ def create_run_configuration(
|
|||
max_run_duration: str = "",
|
||||
input_datasets: Optional[List[DatasetConfig]] = None,
|
||||
output_datasets: Optional[List[DatasetConfig]] = None,
|
||||
use_mpi_run_for_single_node_jobs: bool = False,
|
||||
) -> RunConfiguration:
|
||||
"""
|
||||
Creates an AzureML run configuration, that contains information about environment, multi node execution, and
|
||||
|
@ -188,6 +189,8 @@ def create_run_configuration(
|
|||
:param output_datasets: The script will create a temporary folder when running in AzureML, and while the job writes
|
||||
data to that folder, upload it to blob storage, in the data store.
|
||||
:param num_nodes: The number of nodes to use in distributed training on AzureML.
|
||||
:param use_mpi_run_for_single_node_jobs: If True, even single node jobs will be run as distributed MPI jobs.
|
||||
If False, single node jobs will not be run as distributed jobs.
|
||||
:return:
|
||||
"""
|
||||
run_config = RunConfiguration()
|
||||
|
@ -225,9 +228,8 @@ def create_run_configuration(
|
|||
if max_run_duration:
|
||||
run_config.max_run_duration_seconds = run_duration_string_to_seconds(max_run_duration)
|
||||
|
||||
# Create MPI configuration for distributed jobs (unless num_splits > 1, in which case
|
||||
# an AML HyperdriveConfig is instantiated instead
|
||||
if num_nodes > 1:
|
||||
# Create MPI configuration for distributed jobs
|
||||
if num_nodes > 1 or use_mpi_run_for_single_node_jobs:
|
||||
distributed_job_config = MpiConfiguration(node_count=num_nodes)
|
||||
run_config.mpi = distributed_job_config
|
||||
run_config.framework = "Python"
|
||||
|
@ -483,7 +485,7 @@ def submit_run_v2(
|
|||
# On AML managed compute, we can set distribution to None for single node jobs.
|
||||
# However, on Kubernetes compute, single node jobs don't see any GPUs. GPUs are visible for MpiDistribution
|
||||
# jobs, so we set MpiDistribution even for single node jobs.
|
||||
if use_mpi_run_for_single_node_jobs:
|
||||
if num_nodes > 1 or use_mpi_run_for_single_node_jobs:
|
||||
distribution = MpiDistribution(process_count_per_instance=1)
|
||||
else:
|
||||
distribution = PyTorchDistribution(process_count_per_instance=pytorch_processes_per_node)
|
||||
|
@ -741,7 +743,7 @@ def submit_to_azure_if_needed( # type: ignore
|
|||
strictly_aml_v1: bool = False,
|
||||
identity_based_auth: bool = False,
|
||||
pytorch_processes_per_node_v2: Optional[int] = None,
|
||||
use_mpi_run_for_single_node_jobs: bool = True,
|
||||
use_mpi_run_for_single_node_jobs: bool = False,
|
||||
display_name: Optional[str] = None,
|
||||
entry_command: Optional[PathOrString] = None,
|
||||
) -> AzureRunInfo: # pragma: no cover
|
||||
|
@ -814,9 +816,9 @@ def submit_to_azure_if_needed( # type: ignore
|
|||
:param pytorch_processes_per_node_v2: For plain PyTorch multi-GPU processing: The number of processes per node. This
|
||||
is only supported with AML SDK v2, and ignored in v1. If supplied, the job will be submitted as using the
|
||||
"pytorch" framework (rather than "Python"), and using "nccl" as the communication backend.
|
||||
:param use_mpi_run_for_single_node_jobs: If True, even single node jobs with SDK v2 will be run as distributed MPI
|
||||
jobs. This is required for Kubernetes compute. If False, single node jobs will not be run as distributed jobs.
|
||||
This setting only affects jobs submitted with SDK v2 (when `strictly_aml_v1=False`)
|
||||
:param use_mpi_run_for_single_node_jobs: If True, even single node jobs will be run as distributed MPI
|
||||
jobs. If False, single node jobs will not be run as distributed jobs.
|
||||
Setting this flag to True is required Kubernetes compute.
|
||||
:param display_name: The name for the run that will be displayed in the AML UI. If not provided, a random
|
||||
display name will be generated by AzureML.
|
||||
:return: If the script is submitted to AzureML then we terminate python as the script should be executed in AzureML,
|
||||
|
@ -926,6 +928,7 @@ def submit_to_azure_if_needed( # type: ignore
|
|||
max_run_duration=max_run_duration,
|
||||
input_datasets=cleaned_input_datasets,
|
||||
output_datasets=cleaned_output_datasets,
|
||||
use_mpi_run_for_single_node_jobs=use_mpi_run_for_single_node_jobs,
|
||||
)
|
||||
|
||||
script_run_config = create_script_run(
|
||||
|
|
|
@ -33,6 +33,7 @@ from azureml._restclient.constants import RunStatus
|
|||
from azureml.core import ComputeTarget, Environment, RunConfiguration, ScriptRunConfig, Workspace
|
||||
from azureml.data.azure_storage_datastore import AzureBlobDatastore
|
||||
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
|
||||
from azureml.core.runconfig import MpiConfiguration
|
||||
from azureml.dataprep.fuse.daemon import MountContext
|
||||
from azureml.train.hyperdrive import HyperDriveConfig
|
||||
|
||||
|
@ -2100,8 +2101,7 @@ def test_submit_to_azure_v2_distributed() -> None:
|
|||
assert call_kwargs.get("num_nodes") == num_nodes
|
||||
assert call_kwargs.get("pytorch_processes_per_node") == processes_per_node
|
||||
|
||||
# Single node job: The "distribution" argument of "command" should be set to MpiRun, to ensure that it
|
||||
# runs fine on Kubernetes compute.
|
||||
# Single node job: The "distribution" argument of "command" should be set to None
|
||||
with patch("health_azure.himl.command") as mock_command:
|
||||
_ = himl.submit_to_azure_if_needed(
|
||||
workspace_config_file="mockconfig.json",
|
||||
|
@ -2113,6 +2113,22 @@ def test_submit_to_azure_v2_distributed() -> None:
|
|||
mock_command.assert_called_once()
|
||||
_, call_kwargs = mock_command.call_args
|
||||
assert call_kwargs.get("instance_count") == 1
|
||||
assert call_kwargs.get("distribution") is None
|
||||
|
||||
# Single node job where we set the flag to use MPI runs nevertheless: The "distribution" argument of
|
||||
# "command" should be set to MpiRun, to ensure that it runs fine on Kubernetes compute.
|
||||
with patch("health_azure.himl.command") as mock_command:
|
||||
_ = himl.submit_to_azure_if_needed(
|
||||
workspace_config_file="mockconfig.json",
|
||||
entry_script=Path(__file__),
|
||||
snapshot_root_directory=Path.cwd(),
|
||||
submit_to_azureml=True,
|
||||
strictly_aml_v1=False,
|
||||
use_mpi_run_for_single_node_jobs=True,
|
||||
)
|
||||
mock_command.assert_called_once()
|
||||
_, call_kwargs = mock_command.call_args
|
||||
assert call_kwargs.get("instance_count") == 1
|
||||
assert call_kwargs.get("distribution") == MpiDistribution(process_count_per_instance=1)
|
||||
|
||||
# Single node job: The "distribution" argument of "command" should be set to None if we are passing a flag
|
||||
|
@ -2217,3 +2233,31 @@ def test_extract_v2_data_asset_from_env_vars() -> None:
|
|||
himl._extract_v2_data_asset_from_env_vars(i, "INPUT_") for i in range(len(valid_mock_environment))
|
||||
]
|
||||
assert input_datasets == [Path("input_0"), Path("input_1"), Path("input_2"), Path("input_3")]
|
||||
|
||||
|
||||
@pytest.mark.fast
|
||||
@pytest.mark.parametrize("use_mpi_run_for_single_node_jobs", [True, False])
|
||||
def test_mpi_for_single_node_jobs_v1(use_mpi_run_for_single_node_jobs: bool) -> None:
|
||||
"""Test if we can create an MPI job with a single node using the v1 SDK."""
|
||||
with (
|
||||
patch("health_azure.himl.submit_run") as mock_submit_run,
|
||||
patch("health_azure.himl.register_environment"),
|
||||
patch("health_azure.himl.validate_compute_cluster"),
|
||||
):
|
||||
with pytest.raises(SystemExit):
|
||||
himl.submit_to_azure_if_needed(
|
||||
aml_workspace=MagicMock(name="workspace"),
|
||||
submit_to_azureml=True,
|
||||
strictly_aml_v1=True,
|
||||
use_mpi_run_for_single_node_jobs=use_mpi_run_for_single_node_jobs,
|
||||
entry_script="foo",
|
||||
)
|
||||
mock_submit_run.assert_called_once()
|
||||
run_config = mock_submit_run.call_args[1]["script_run_config"].run_config
|
||||
assert run_config.node_count == 1
|
||||
assert isinstance(run_config.mpi, MpiConfiguration)
|
||||
assert run_config.mpi.node_count == 1
|
||||
if use_mpi_run_for_single_node_jobs:
|
||||
assert run_config.communicator == "IntelMpi"
|
||||
else:
|
||||
assert run_config.communicator == "None"
|
||||
|
|
Загрузка…
Ссылка в новой задаче