[Enhancement]: Multi-node-support for MaaS (#3279)

* [Enhancement] : Multi-node-support for MaaS

* corrected change related to model sel. number of train epochs

* cleanup unused variable

* formatting updates

* correct string formats

* string corrections

* spaces and indent cleanup

* line char constraints fixed

* removed trailing white spaces

* added doc strings

* format for line constraint

* correct doc string

* doc string fix

* ENH : Portalocker on wait barrier

* ENH : Processname optimized

* ENH : Changes as per review comments

* ENH : Fail if input value is not a number
This commit is contained in:
Yeshwanth N 2024-08-27 14:34:49 +05:30 коммит произвёл GitHub
Родитель 839235c223
Коммит a09b728203
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
1 изменённых файлов: 140 добавлений и 27 удалений

Просмотреть файл

@ -6,9 +6,11 @@
import os
import subprocess
import logging
import portalocker
import json
from pathlib import Path
import shutil
import time
from dataclasses import dataclass, field, fields
from typing import Optional, List
@ -367,32 +369,117 @@ def add_task_specific_params(cmd: List[str], task_name: str, component_name: str
add_optional_input(cmd, param)
def _run_subprocess_cmd(cmd: List[str], component_name: str, completion_files_folder: str):
def is_main_process():
"""
To check if current process running this is master or rank_0 process.
:return: Boolean for whether this process is master.
"""
return os.environ.get('AZUREML_PROCESS_NAME', 'main') in {'main', 'rank_0'}
def wait_at_barrier(barrier_file, num_processes):
"""
Control will halt till number to process reaching the execution point is less than a given number.
barrier_file: File used to create execution barrier.
num_processes: Number of process which need to reach barrier point.
"""
process_name = os.environ.get('AZUREML_PROCESS_NAME', 'main')
with open(barrier_file, 'a+') as f:
portalocker.lock(f, portalocker.LOCK_EX)
try:
f.seek(0)
lines = f.readlines()
process_count = len(lines)
if process_count < num_processes:
f.write(f"{os.getpid()} reached the barrier\n")
f.flush()
os.fsync(f.fileno())
portalocker.unlock(f)
logger.info(f'Process {process_name} at barrier, count is {process_count},')
logger.info(f'in barrier file {barrier_file}')
finally:
portalocker.unlock(f)
with open(barrier_file, 'r') as f:
while len(lines) < num_processes:
portalocker.lock(f, portalocker.LOCK_EX)
f.seek(0)
lines = f.readlines()
portalocker.unlock(f)
time.sleep(0.5) # Polling interval
logger.info(f"Process {os.getpid()} has passed barrier with name {process_name}")
def _run_subprocess_cmd(cmd: List[str], component_name: str, completion_files_folder: str,
single_run=True, number_of_processes=1):
"""Run the subprocess command."""
logger.info(f"Starting the command: {cmd}")
completion_file = Path(completion_files_folder, f"{component_name}.complete.txt")
barrier_file = Path(completion_files_folder, f"{component_name}.barrier.txt")
Path(completion_files_folder).mkdir(parents=True, exist_ok=True)
if completion_file.exists():
logger.info(f"Skipping {component_name} as completion file exists: {completion_file}")
return
process_name = os.environ.get('AZUREML_PROCESS_NAME', 'main')
if not barrier_file.exists():
Path(barrier_file).touch()
logger.info(f'Barrier file {barrier_file} is created by process name {process_name}')
if single_run:
if is_main_process():
logger.info(f"Executing command: {cmd} in single run mode. Process name is {process_name}")
# Not setting stdout and stderr will stream all the logs directly to stdout
process = subprocess.Popen(cmd)
# Not setting stdout and stderr will stream all the logs directly to stdout
process = subprocess.Popen(cmd)
# get the return code
return_code = process.wait()
if return_code != 0:
intermediate_folder = decode_output_from_env_var("intermediate_folder")
completion_files_folder = os.path.join(intermediate_folder, "completion_files")
shutil.rmtree(completion_files_folder, ignore_errors=True)
raise ACFTValidationException._with_error(
AzureMLError.create(
ACFTUserError,
pii_safe_message=(
f"{component_name} failed"
)
)
)
logger.info(f"{component_name} completed successfully")
Path(completion_file).touch()
logger.info(f"Created completion file: {completion_file}")
logger.info(f"Waiting for completion file: {completion_file}, by rank : {process_name}")
while not completion_file.exists():
if is_main_process():
time.sleep(1)
else:
pass
logger.info(f"Process name on subprocess entering count barrier : {process_name}")
wait_at_barrier(barrier_file, number_of_processes)
logger.info(f"Process name on subprocess exiting count barrier : {process_name}")
else:
logger.info(f"Executing the command: {cmd} in multi-process mode.")
# Not setting stdout and stderr will stream all the logs directly to stdout
process = subprocess.Popen(cmd)
# get the return code
return_code = process.wait()
if return_code != 0:
raise ACFTValidationException._with_error(
AzureMLError.create(
ACFTUserError,
pii_safe_message=(
f"{component_name} failed"
# get the return code
return_code = process.wait()
if return_code != 0:
intermediate_folder = decode_output_from_env_var("intermediate_folder")
completion_files_folder = os.path.join(intermediate_folder, "completion_files")
shutil.rmtree(completion_files_folder, ignore_errors=True)
raise ACFTValidationException._with_error(
AzureMLError.create(
ACFTUserError,
pii_safe_message=(
f"{component_name} failed"
)
)
)
)
logger.info(f"{component_name} completed successfully")
Path(completion_files_folder).mkdir(parents=True, exist_ok=True)
Path(completion_file).touch()
logger.info(f"Created completion file: {completion_file}")
logger.info(f"{component_name} completed successfully")
Path(completion_files_folder).mkdir(parents=True, exist_ok=True)
Path(completion_file).touch()
logger.info(f"Created completion file: {completion_file}")
def cleanup(completion_files_folder: str, model_selector_output: str,
@ -429,8 +516,29 @@ def initiate_run():
_initiate_run(completion_files_folder, model_selector_output,
preprocess_output, pytorch_model_folder, mlflow_model_folder)
cleanup(completion_files_folder, model_selector_output,
preprocess_output, pytorch_model_folder, mlflow_model_folder)
if is_main_process():
cleanup(completion_files_folder, model_selector_output,
preprocess_output, pytorch_model_folder, mlflow_model_folder)
def parse_to_int(s):
"""
To parse string to integer with default value in case of failure as one.
s: String which need to be parsed to integer.
"""
try:
return int(s)
except ValueError:
raise ACFTValidationException._with_error(
AzureMLError.create(
ACFTUserError,
pii_safe_message=(
f"Invalid value {s} entered, it should be a number."
)
)
)
def _initiate_run(completion_files_folder: str, model_selector_output: str,
@ -438,6 +546,9 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
"""Run the model selector, preprocess, finetune and registration script."""
# get task name
task_name = decode_param_from_env_var("task_name")
nodes = parse_to_int(decode_param_from_env_var("Node_Count"))
gpus = parse_to_int(decode_param_from_env_var("number_of_gpu_to_use_finetuning"))
logger.info(f'Nodes are {nodes} , gpus are : {gpus}')
# model selector
cmd = [
@ -447,8 +558,8 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
]
add_optional_input(cmd, "mlflow_model_path")
add_optional_input(cmd, "pytorch_model_path")
_run_subprocess_cmd(cmd, component_name="model_selector", completion_files_folder=completion_files_folder)
_run_subprocess_cmd(cmd, component_name="model_selector", completion_files_folder=completion_files_folder,
single_run=True, number_of_processes=gpus)
# preprocess
cmd = [
"python", "-m", "azureml.acft.contrib.hf.nlp.entry_point.finetune.preprocess",
@ -469,13 +580,12 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
if os.path.isfile(validation_file_path):
cmd += ["--validation_file_path", validation_file_path]
_run_subprocess_cmd(cmd, component_name="preprocess", completion_files_folder=completion_files_folder)
_run_subprocess_cmd(cmd, component_name="preprocess", completion_files_folder=completion_files_folder,
single_run=True, number_of_processes=gpus)
# finetune
cmd = [
"python", "-m", "torch.distributed.launch",
"--nproc_per_node", decode_param_from_env_var('number_of_gpu_to_use_finetuning'),
"-m", "azureml.acft.contrib.hf.nlp.entry_point.finetune.finetune",
"python", "-m", "azureml.acft.contrib.hf.nlp.entry_point.finetune.finetune",
"--apply_lora", decode_param_from_env_var('apply_lora'),
"--merge_lora_weights", decode_param_from_env_var('merge_lora_weights'),
"--lora_alpha", decode_param_from_env_var('lora_alpha'),
@ -525,7 +635,8 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
"--mlflow_model_folder", mlflow_model_folder,
"--output_model", decode_output_from_env_var('output_model')
]
_run_subprocess_cmd(cmd, component_name="finetune", completion_files_folder=completion_files_folder)
_run_subprocess_cmd(cmd, component_name="finetune", completion_files_folder=completion_files_folder,
single_run=False, number_of_processes=gpus)
# validate lora weights
@ -543,7 +654,8 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
"--train_file_path", os.path.join(decode_input_from_env_var("dataset_input") or "", "train_input.jsonl"),
]
add_task_specific_params(cmd, task_name, component_name="validate_lora_weights")
_run_subprocess_cmd(cmd, component_name="validate_lora_weights", completion_files_folder=completion_files_folder)
_run_subprocess_cmd(cmd, component_name="validate_lora_weights", completion_files_folder=completion_files_folder,
single_run=True, number_of_processes=gpus)
# model registration
cmd = [
@ -558,7 +670,8 @@ def _initiate_run(completion_files_folder: str, model_selector_output: str,
"--convert_to_safetensors", "true",
]
add_optional_param(cmd=cmd, component_param_name="registered_model_name", argparse_param_name="model_name")
_run_subprocess_cmd(cmd, component_name="register_model", completion_files_folder=completion_files_folder)
_run_subprocess_cmd(cmd, component_name="register_model", completion_files_folder=completion_files_folder,
single_run=True, number_of_processes=gpus)
@swallow_all_exceptions(time_delay=60)