diff --git a/.github/workflows/nv-inference.yml b/.github/workflows/nv-inference.yml index fbb4a54f5..1df814269 100644 --- a/.github/workflows/nv-inference.yml +++ b/.github/workflows/nv-inference.yml @@ -17,7 +17,7 @@ concurrency: jobs: unit-tests: - runs-on: [self-hosted, nvidia, cu111, v100] + runs-on: [self-hosted, nvidia, cu113, v100] steps: - uses: actions/checkout@v2 @@ -31,7 +31,7 @@ jobs: nvcc --version pip install --upgrade pip pip uninstall --yes torch torchvision - pip install torch==1.8.2+cu111 torchvision==0.9.2+cu111 -f https://download.pytorch.org/whl/lts/1.8/torch_lts.html + pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu113 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -60,4 +60,5 @@ jobs: unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'inference' unit/ --torch_ver="1.8" --cuda_ver="11.1" + EXPECTED_TORCH=$(pip index versions torch | grep -oP -m1 "^\s*LATEST.*\s\K\d+\.\d+") + TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'inference' unit/ --torch_ver=$EXPECTED_TORCH --cuda_ver="11.3" diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index fd6859c7f..028b05b43 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -60,5 +60,6 @@ jobs: unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -n 4 unit/ - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'sequential' unit/ + EXPECTED_TORCH=$(pip index versions torch | grep -oP -m1 "^\s*LATEST.*\s\K\d+\.\d+") + TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -n 4 unit/ --torch_ver=$EXPECTED_TROCH --cuda_ver="11.3" + TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'sequential' unit/ --torch_ver=$EXPECTED_TORCH --cuda_ver="11.3" diff --git a/.github/workflows/nv-transformers-v100.yml b/.github/workflows/nv-transformers-v100.yml index efbd015ce..bf8ceca53 100644 --- a/.github/workflows/nv-transformers-v100.yml +++ b/.github/workflows/nv-transformers-v100.yml @@ -62,4 +62,4 @@ jobs: # force protobuf version due to issues pip install "protobuf<4.21.0" pip list - TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed + WANDB_DISABLED=true TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index d0258ff66..e3801f08c 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -1,5 +1,6 @@ clang-format docutils<0.18 +future importlib-metadata>=4 megatron-lm==1.1.5 pre-commit @@ -10,5 +11,7 @@ pytest-xdist recommonmark sphinx sphinx-rtd-theme +tensorboard torchvision transformers +wandb diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/conftest.py b/tests/conftest.py index 4d4f23afe..6f1831c53 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,3 +43,14 @@ def check_environment(pytestconfig): pytest.exit( f"expected cuda version {expected_cuda_version} did not match found cuda version {torch.version.cuda}", returncode=2) + + +# Override of pytest "runtest" for DistributedTest class +# This hook is run before the default pytest_runtest_call +@pytest.hookimpl(tryfirst=True) +def pytest_runtest_call(item): + # We want to use our own launching function for distributed tests + if getattr(item.cls, "is_dist_test", False): + dist_test_class = item.cls() + dist_test_class._run_test(item._request) + item.runtest = lambda: True # Dummy function so test is not run twice diff --git a/tests/unit/comm/test_dist.py b/tests/unit/comm/test_dist.py new file mode 100644 index 000000000..34a86a9e0 --- /dev/null +++ b/tests/unit/comm/test_dist.py @@ -0,0 +1,62 @@ +import torch +import deepspeed.comm as dist + +from tests.unit.common import DistributedTest + +import pytest + + +class TestInit(DistributedTest): + world_size = 3 + + def test(self): + assert dist.is_initialized() + assert dist.get_world_size() == 3 + assert dist.get_rank() < 3 + + +# Demonstration of pytest's parameterization and fixtures +@pytest.fixture(params=["hello"]) +def greeting(request): + return request.param + + +@pytest.mark.parametrize("number,color", [(1138, "purple")]) +class TestDistArgs(DistributedTest): + world_size = 2 + """ Classes that use DistributedTest class must define a test* method """ + @pytest.mark.parametrize("shape", ["icosahedron"]) + def test(self, number, color, shape, greeting): + """Ensure that we can parse args to DistributedTest methods. """ + assert dist.get_world_size() == 2 + assert number == 1138 + assert color == "purple" + assert shape == "icosahedron" + assert greeting == "hello" + + +# Demonstration of distributed tests grouped in single class +@pytest.mark.parametrize("number", [1138]) +class TestGroupedDistTest(DistributedTest): + world_size = 2 + + def test_one(self, number): + assert dist.get_world_size() == 2 + assert number == 1138 + + @pytest.mark.parametrize("color", ["purple"]) + def test_two(self, number, color): + assert dist.get_world_size() == 2 + assert number == 1138 + assert color == "purple" + + +class TestDistAllReduce(DistributedTest): + world_size = [1, 2, 4] + + def test(self): + x = torch.ones(1, 3).cuda() * (dist.get_rank() + 1) + sum_of_ranks = (dist.get_world_size() * (dist.get_world_size() + 1)) // 2 + result = torch.ones(1, 3).cuda() * sum_of_ranks + dist.all_reduce(x) + assert torch.all(x == result) diff --git a/tests/unit/common.py b/tests/unit/common.py index 7a60cbe8c..7b418ebbf 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -1,15 +1,17 @@ import os import time +import inspect +from abc import ABC +from pathlib import Path import torch +import torch.multiprocessing as mp +import deepspeed import deepspeed.comm as dist from torch.multiprocessing import Process -import deepspeed - import pytest - -from pathlib import Path +from _pytest.outcomes import Skipped # Worker timeout *after* the first worker has completed. DEEPSPEED_UNIT_WORKER_TIMEOUT = 120 @@ -60,6 +62,108 @@ def set_cuda_visibile(): os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) +class DistributedTest(ABC): + is_dist_test = True + world_size = 2 + backend = "nccl" + + def _run_test(self, request): + self.current_test = self._get_current_test_func(request) + self.test_kwargs = self._get_test_kwargs(request) + if isinstance(self.world_size, int): + self.world_size = [self.world_size] + for procs in self.world_size: + self._launch_procs(procs) + time.sleep(0.5) + + def _get_current_test_func(self, request): + # DistributedTest subclasses may have multiple test methods + func_name = request.function.__name__ + return getattr(self, func_name) + + def _get_test_kwargs(self, request): + # Grab fixture / parametrize kwargs from pytest request object + test_kwargs = {} + params = inspect.getfullargspec(self.current_test).args + params.remove("self") + for p in params: + test_kwargs[p] = request.getfixturevalue(p) + return test_kwargs + + def _launch_procs(self, num_procs): + mp.set_start_method('forkserver', force=True) + skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason + processes = [] + for local_rank in range(num_procs): + p = Process(target=self._dist_init, args=(local_rank, num_procs, skip_msg)) + p.start() + processes.append(p) + + # Now loop and wait for a test to complete. The spin-wait here isn't a big + # deal because the number of processes will be O(#GPUs) << O(#CPUs). + any_done = False + while not any_done: + for p in processes: + if not p.is_alive(): + any_done = True + break + + # Wait for all other processes to complete + for p in processes: + p.join(DEEPSPEED_UNIT_WORKER_TIMEOUT) + + failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0] + for rank, p in failed: + # If it still hasn't terminated, kill it because it hung. + if p.exitcode is None: + p.terminate() + pytest.fail(f'Worker {rank} hung.', pytrace=False) + if p.exitcode < 0: + pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', + pytrace=False) + if p.exitcode > 0: + pytest.fail(f'Worker {rank} exited with code {p.exitcode}', + pytrace=False) + + if not skip_msg.empty(): + # This assumed all skip messages are the same, it may be useful to + # add a check here to assert all exit messages are equal + pytest.skip(skip_msg.get()) + + def _dist_init(self, local_rank, num_procs, skip_msg): + """Initialize deepspeed.comm and execute the user function. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = get_master_port() + os.environ['LOCAL_RANK'] = str(local_rank) + # NOTE: unit tests don't support multi-node so local_rank == global rank + os.environ['RANK'] = str(local_rank) + os.environ['WORLD_SIZE'] = str(num_procs) + + # turn off NCCL logging if set + os.environ.pop('NCCL_DEBUG', None) + + set_cuda_visibile() + + deepspeed.init_distributed(dist_backend=self.backend) + dist.barrier() + + if torch.cuda.is_available(): + torch.cuda.set_device(local_rank) + + try: + self.current_test(**self.test_kwargs) + except BaseException as e: + if isinstance(e, Skipped): + skip_msg.put(e.msg) + else: + raise e + + # make sure all ranks finish at the same time + dist.barrier() + # tear down after test completes + dist.destroy_process_group() + + def distributed_test(world_size=2, backend='nccl'): """A decorator for executing a function (e.g., a unit test) in a distributed manner. This decorator manages the spawning and joining of processes, initialization of diff --git a/tests/unit/test_inference.py b/tests/unit/inference/test_inference.py similarity index 82% rename from tests/unit/test_inference.py rename to tests/unit/inference/test_inference.py index 90586dee1..09fcd0736 100644 --- a/tests/unit/test_inference.py +++ b/tests/unit/inference/test_inference.py @@ -5,13 +5,15 @@ import pytest import itertools import deepspeed from deepspeed.git_version_info import torch_info -from .common import distributed_test +from tests.unit.common import DistributedTest from packaging import version as pkg_version from deepspeed.ops.op_builder import OpBuilder from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer from huggingface_hub import HfApi +# Fixture avoids problems with missing imports when pytest collects tests when +# running non-inference tests @pytest.fixture(scope="module", autouse=True) def lm_eval_imports(): global lm_eval @@ -159,29 +161,41 @@ def inf_kwargs(model_w_task): return {} +def fill_mask_assert(x, y): + return set(res["token_str"] for res in x) == set(res["token_str"] for res in y) + + +def question_answering_assert(x, y): + return x["answer"] == y["answer"] + + +def text_classification_assert(x, y): + return set(res["label"] for res in x) == set(res["label"] for res in y) + + +def token_classification_assert(x, y): + return set(ent["word"] for ent in x) == set(ent["word"] for ent in y) + + +def text_generation_assert(x, y): + return set(res["generated_text"] for res in x) == set(res["generated_text"] + for res in y) + + @pytest.fixture def assert_fn(model_w_task): model, task = model_w_task - if task == "fill-mask": - return lambda x, y: set(res["token_str"] for res in x) == set( - res["token_str"] for res in y - ) - elif task == "question-answering": - return lambda x, y: x["answer"] == y["answer"] - elif task == "text-classification": - return lambda x, y: set(res["label"] for res in x) == set( - res["label"] for res in y - ) - elif task == "token-classification": - return lambda x, y: set(ent["word"] for ent in x) == set( - ent["word"] for ent in y - ) - elif task == "text-generation": - return lambda x, y: set(res["generated_text"] for res in x) == set( - res["generated_text"] for res in y - ) - else: + assert_fn_dict = { + "fill-mask": fill_mask_assert, + "question-answering": question_answering_assert, + "text-classification": text_classification_assert, + "token-classification": token_classification_assert, + "text-generation": text_generation_assert, + } + assert_fn = assert_fn_dict.get(task, None) + if assert_fn is None: NotImplementedError(f'assert_fn for task "{task}" is not implemented') + return assert_fn """ @@ -190,22 +204,23 @@ Tests @pytest.mark.inference -def test_model_task( - model_w_task, - dtype, - enable_cuda_graph, - query, - inf_kwargs, - assert_fn, - invalid_model_task_config, -): - if invalid_model_task_config: - pytest.skip(invalid_model_task_config) +class TestModelTask(DistributedTest): + world_size = 1 - model, task = model_w_task + def test( + self, + model_w_task, + dtype, + enable_cuda_graph, + query, + inf_kwargs, + assert_fn, + invalid_model_task_config, + ): + if invalid_model_task_config: + pytest.skip(invalid_model_task_config) - @distributed_test(world_size=[1]) - def _go(): + model, task = model_w_task local_rank = int(os.getenv("LOCAL_RANK", "0")) if "gpt-j-6B" in model and dtype == torch.half: @@ -225,8 +240,8 @@ def test_model_task( pipe.model.half() # Warm-up queries for perf measurement - for i in range(10): - _ = pipe(query, **inf_kwargs) + #for i in range(10): + # _ = pipe(query, **inf_kwargs) torch.cuda.synchronize() start = time.time() bs_output = pipe(query, **inf_kwargs) @@ -242,8 +257,8 @@ def test_model_task( enable_cuda_graph=enable_cuda_graph, ) # Warm-up queries for perf measurement - for i in range(10): - _ = pipe(query, **inf_kwargs) + #for i in range(10): + # _ = pipe(query, **inf_kwargs) torch.cuda.synchronize() start = time.time() ds_output = pipe(query, **inf_kwargs) @@ -258,8 +273,6 @@ def test_model_task( #assert ds_time <= (bs_time * 1.1) assert assert_fn(bs_output, ds_output) - _go() - @pytest.mark.nightly @pytest.mark.parametrize( @@ -274,9 +287,10 @@ def test_model_task( ), ) @pytest.mark.parametrize("task", ["lambada"]) -def test_lm_correctness(model_family, model_name, task): - @distributed_test(world_size=[1]) - def _go(): +class TestLMCorrectness(DistributedTest): + world_size = 1 + + def test(self, model_family, model_name, task): local_rank = os.getenv("LOCAL_RANK", "0") device = torch.device(f"cuda:{local_rank}") dtype = torch.float @@ -320,5 +334,3 @@ def test_lm_correctness(model_family, model_name, task): ds_output["results"][task]["ppl"]) #assert ds_time <= bs_time assert ppl_diff < 0.01 - - _go() diff --git a/tests/unit/test_monitor.py b/tests/unit/monitor/test_monitor.py similarity index 69% rename from tests/unit/test_monitor.py rename to tests/unit/monitor/test_monitor.py index a417fb9f7..674a8d7ce 100644 --- a/tests/unit/test_monitor.py +++ b/tests/unit/monitor/test_monitor.py @@ -1,36 +1,17 @@ -import pytest - from deepspeed.monitor.constants import * from deepspeed.monitor.tensorboard import TensorBoardMonitor from deepspeed.monitor.wandb import WandbMonitor from deepspeed.monitor.csv_monitor import csvMonitor -from .simple_model import * -from .common import distributed_test +from tests.unit.common import DistributedTest from deepspeed.runtime.config import DeepSpeedConfig -try: - import tensorboard # noqa: F401 - _tb_available = True -except ImportError: - _tb_available = False -tb_available = pytest.mark.skipif(not _tb_available, - reason="tensorboard is not installed") -try: - import wandb # noqa: F401 - _wandb_available = True -except ImportError: - _wandb_available = False -wandb_available = pytest.mark.skipif(not _wandb_available, - reason="wandb is not installed") +class TestTensorBoard(DistributedTest): + world_size = 2 - -@tb_available -def test_tensorboard(tmpdir): - @distributed_test(world_size=2) - def _test_tensorboard(): + def test_tensorboard(self): config_dict = { "train_batch_size": 2, "tensorboard": { @@ -45,13 +26,7 @@ def test_tensorboard(tmpdir): assert tb_monitor.output_path == "test_output/ds_logs/" assert tb_monitor.job_name == "test" - _test_tensorboard() - - -@tb_available -def test_empty_tensorboard(tmpdir): - @distributed_test(world_size=2) - def _test_empty_tensorboard(): + def test_empty_tensorboard(self): config_dict = {"train_batch_size": 2, "tensorboard": {}} ds_config = DeepSpeedConfig(config_dict) tb_monitor = TensorBoardMonitor(ds_config.monitor_config) @@ -59,13 +34,11 @@ def test_empty_tensorboard(tmpdir): assert tb_monitor.output_path == TENSORBOARD_OUTPUT_PATH_DEFAULT assert tb_monitor.job_name == TENSORBOARD_JOB_NAME_DEFAULT - _test_empty_tensorboard() +class TestWandB(DistributedTest): + world_size = 2 -@wandb_available -def test_wandb(tmpdir): - @distributed_test(world_size=2) - def _test_wandb(): + def test_wandb(self): config_dict = { "train_batch_size": 2, "wandb": { @@ -82,13 +55,7 @@ def test_wandb(tmpdir): assert wandb_monitor.team == "my_team" assert wandb_monitor.project == "my_project" - _test_wandb() - - -@wandb_available -def test_empty_wandb(tmpdir): - @distributed_test(world_size=2) - def _test_empty_wandb(): + def test_empty_wandb(self): config_dict = {"train_batch_size": 2, "wandb": {}} ds_config = DeepSpeedConfig(config_dict) wandb_monitor = WandbMonitor(ds_config.monitor_config) @@ -97,12 +64,11 @@ def test_empty_wandb(tmpdir): assert wandb_monitor.team == WANDB_TEAM_NAME_DEFAULT assert wandb_monitor.project == WANDB_PROJECT_NAME_DEFAULT - _test_empty_wandb() +class TestCSVMonitor(DistributedTest): + world_size = 2 -def test_csv_monitor(tmpdir): - @distributed_test(world_size=2) - def _test_csv_monitor(): + def test_csv_monitor(self): config_dict = { "train_batch_size": 2, "csv_monitor": { @@ -117,12 +83,7 @@ def test_csv_monitor(tmpdir): assert csv_monitor.output_path == "test_output/ds_logs/" assert csv_monitor.job_name == "test" - _test_csv_monitor() - - -def test_empty_csv_monitor(tmpdir): - @distributed_test(world_size=2) - def _test_empty_csv_monitor(): + def test_empty_csv_monitor(self): config_dict = {"train_batch_size": 2, "csv_monitor": {}} ds_config = DeepSpeedConfig(config_dict) csv_monitor = csvMonitor(ds_config.monitor_config) diff --git a/tests/unit/test_cpu_adagrad.py b/tests/unit/ops/adagrad/test_cpu_adagrad.py similarity index 100% rename from tests/unit/test_cpu_adagrad.py rename to tests/unit/ops/adagrad/test_cpu_adagrad.py diff --git a/tests/unit/test_adamw.py b/tests/unit/ops/adam/test_adamw.py similarity index 67% rename from tests/unit/test_adamw.py rename to tests/unit/ops/adam/test_adamw.py index b4bfbf3c2..c898d0c0e 100644 --- a/tests/unit/test_adamw.py +++ b/tests/unit/ops/adam/test_adamw.py @@ -4,8 +4,8 @@ import pytest from deepspeed.ops.adam import FusedAdam from deepspeed.ops.adam import DeepSpeedCPUAdam -from .common import distributed_test -from .simple_model import SimpleModel, args_from_dict +from tests.unit.common import DistributedTest +from tests.unit.simple_model import SimpleModel # yapf: disable #'optimizer, zero_offload, torch_adam, adam_w_mode, resulting_optimizer @@ -29,38 +29,37 @@ adam_configs = [["AdamW", False, False, False, (FusedAdam, True)], @pytest.mark.parametrize( 'optimizer, zero_offload, torch_adam, adam_w_mode, resulting_optimizer', adam_configs) -def test_adam_configs(tmpdir, - optimizer, - zero_offload, - torch_adam, - adam_w_mode, - resulting_optimizer): - config_dict = { - "train_batch_size": 2, - "steps_per_print": 1, - "optimizer": { - "type": optimizer, - "params": { - "lr": 0.00015, - "torch_adam": torch_adam, - "adam_w_mode": adam_w_mode - } - }, - "gradient_clipping": 1.0, - "fp16": { - "enabled": True - }, - "zero_optimization": { - "stage": 2, - "cpu_offload": zero_offload - } - } - args = args_from_dict(tmpdir, config_dict) +class TestAdamConfigs(DistributedTest): + world_size = 1 - @distributed_test(world_size=[1]) - def helper(args): + def test(self, + optimizer, + zero_offload, + torch_adam, + adam_w_mode, + resulting_optimizer): + config_dict = { + "train_batch_size": 2, + "steps_per_print": 1, + "optimizer": { + "type": optimizer, + "params": { + "lr": 0.00015, + "torch_adam": torch_adam, + "adam_w_mode": adam_w_mode + } + }, + "gradient_clipping": 1.0, + "fp16": { + "enabled": True + }, + "zero_optimization": { + "stage": 2, + "cpu_offload": zero_offload + } + } model = SimpleModel(10) - model, _, _, _ = deepspeed.initialize(args=args, + model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) # get base optimizer under zero @@ -69,5 +68,3 @@ def test_adam_configs(tmpdir, assert isinstance(ds_optimizer, opt_class) if adam_w_mode in [True, False]: assert ds_optimizer.adam_w_mode == adam_w_mode - - helper(args) diff --git a/tests/unit/test_cpu_adam.py b/tests/unit/ops/adam/test_cpu_adam.py similarity index 100% rename from tests/unit/test_cpu_adam.py rename to tests/unit/ops/adam/test_cpu_adam.py diff --git a/tests/unit/test_flops_profiler.py b/tests/unit/profiling/flops_profiler/test_flops_profiler.py similarity index 76% rename from tests/unit/test_flops_profiler.py rename to tests/unit/profiling/flops_profiler/test_flops_profiler.py index 9a01f5c6a..734e2996f 100644 --- a/tests/unit/test_flops_profiler.py +++ b/tests/unit/profiling/flops_profiler/test_flops_profiler.py @@ -2,8 +2,8 @@ import torch import pytest import deepspeed from deepspeed.profiling.flops_profiler import get_model_profile -from .simple_model import SimpleModel, random_dataloader, args_from_dict -from .common import distributed_test +from tests.unit.simple_model import SimpleModel, random_dataloader +from tests.unit.common import DistributedTest TORCH_MAJOR = int(torch.__version__.split('.')[0]) TORCH_MINOR = int(torch.__version__.split('.')[1]) @@ -19,36 +19,36 @@ def within_range(val, target, tolerance): TOLERANCE = 0.05 -def test_flops_profiler_in_ds_training(tmpdir): - config_dict = { - "train_batch_size": 1, - "steps_per_print": 1, - "optimizer": { - "type": "Adam", - "params": { - "lr": 0.001, - } - }, - "zero_optimization": { - "stage": 0 - }, - "fp16": { - "enabled": True, - }, - "flops_profiler": { - "enabled": True, - "step": 1, - "module_depth": -1, - "top_modules": 3, - }, - } - args = args_from_dict(tmpdir, config_dict) - hidden_dim = 10 - model = SimpleModel(hidden_dim, empty_grad=False) +class TestFlopsProfilerInDSTraining(DistributedTest): + world_size = 1 - @distributed_test(world_size=[1]) - def _test_flops_profiler_in_ds_training(args, model, hidden_dim): - model, _, _, _ = deepspeed.initialize(args=args, + def test(self): + config_dict = { + "train_batch_size": 1, + "steps_per_print": 1, + "optimizer": { + "type": "Adam", + "params": { + "lr": 0.001, + } + }, + "zero_optimization": { + "stage": 0 + }, + "fp16": { + "enabled": True, + }, + "flops_profiler": { + "enabled": True, + "step": 1, + "module_depth": -1, + "top_modules": 3, + }, + } + hidden_dim = 10 + model = SimpleModel(hidden_dim, empty_grad=False) + + model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) @@ -65,8 +65,6 @@ def test_flops_profiler_in_ds_training(tmpdir): assert within_range(model.flops_profiler.flops, 200, tolerance=TOLERANCE) assert model.flops_profiler.params == 110 - _test_flops_profiler_in_ds_training(args, model, hidden_dim) - class LeNet5(torch.nn.Module): def __init__(self, n_classes): diff --git a/tests/unit/test_topology.py b/tests/unit/runtime/pipe/test_topology.py similarity index 77% rename from tests/unit/test_topology.py rename to tests/unit/runtime/pipe/test_topology.py index 78b786701..ae6fb7ecf 100644 --- a/tests/unit/test_topology.py +++ b/tests/unit/runtime/pipe/test_topology.py @@ -7,7 +7,7 @@ from deepspeed.runtime.pipe.topology import PipelineParallelGrid as Grid from deepspeed.runtime.pipe.topology import ProcessTopology as Topo from deepspeed.runtime.pipe.topology import _prime_factors -from .common import distributed_test +from tests.unit.common import DistributedTest def test_topology_2d(): @@ -157,51 +157,51 @@ def test_topology_comm_list(): assert topo.get_axis_comm_lists('jeff') == [] -@distributed_test(world_size=4) -def test_grid_pipe_data(): - topo = Topo(axes=['pipe', 'data'], dims=[2, 2]) - grid = Grid(topology=topo) +class TestDistributedTopology(DistributedTest): + world_size = 4 - assert grid._is_grid_valid() + def test_grid_pipe_data(self): + topo = Topo(axes=['pipe', 'data'], dims=[2, 2]) + grid = Grid(topology=topo) - rank = dist.get_rank() + assert grid._is_grid_valid() - assert grid.is_first_stage == (grid.get_stage_id() == 0) - assert grid.is_last_stage == ( - grid.get_stage_id() == grid.get_pipe_parallel_world_size() - 1) + rank = dist.get_rank() - # Test collectives along the pipeline parallel process groups - rank_tensor = torch.LongTensor(data=[rank]).cuda() - dist.all_reduce(rank_tensor, group=grid.get_pipe_parallel_group()) - pipe_group = grid.pp_group - assert torch.all(rank_tensor == sum(pipe_group)) + assert grid.is_first_stage == (grid.get_stage_id() == 0) + assert grid.is_last_stage == ( + grid.get_stage_id() == grid.get_pipe_parallel_world_size() - 1) - # Test collectives along the data parallel process groups - rank_tensor = torch.LongTensor(data=[rank]).cuda() - dist.all_reduce(rank_tensor, group=grid.get_data_parallel_group()) - data_group = grid.dp_group - assert torch.all(rank_tensor == sum(data_group)) + # Test collectives along the pipeline parallel process groups + rank_tensor = torch.LongTensor(data=[rank]).cuda() + dist.all_reduce(rank_tensor, group=grid.get_pipe_parallel_group()) + pipe_group = grid.pp_group + assert torch.all(rank_tensor == sum(pipe_group)) + # Test collectives along the data parallel process groups + rank_tensor = torch.LongTensor(data=[rank]).cuda() + dist.all_reduce(rank_tensor, group=grid.get_data_parallel_group()) + data_group = grid.dp_group + assert torch.all(rank_tensor == sum(data_group)) -@distributed_test(world_size=4) -def test_stage_to_global(): - topo = Topo(axes=['pipe', 'data'], dims=[2, 2]) - grid = Grid(topology=topo) + def test_stage_to_global(self): + topo = Topo(axes=['pipe', 'data'], dims=[2, 2]) + grid = Grid(topology=topo) - assert grid._is_grid_valid() + assert grid._is_grid_valid() - assert grid.stage_to_global(stage_id=0, data=0) == 0 - assert grid.stage_to_global(stage_id=0, data=1) == 1 - assert grid.stage_to_global(stage_id=1, data=0) == 2 - assert grid.stage_to_global(stage_id=1, data=1) == 3 + assert grid.stage_to_global(stage_id=0, data=0) == 0 + assert grid.stage_to_global(stage_id=0, data=1) == 1 + assert grid.stage_to_global(stage_id=1, data=0) == 2 + assert grid.stage_to_global(stage_id=1, data=1) == 3 - me = topo.get_coord(rank=dist.get_rank()) - if me.data == 0: - assert grid.stage_to_global(stage_id=0) == 0 - assert grid.stage_to_global(stage_id=1) == 2 - else: - assert grid.stage_to_global(stage_id=0) == 1 - assert grid.stage_to_global(stage_id=1) == 3 + me = topo.get_coord(rank=dist.get_rank()) + if me.data == 0: + assert grid.stage_to_global(stage_id=0) == 0 + assert grid.stage_to_global(stage_id=1) == 2 + else: + assert grid.stage_to_global(stage_id=0) == 1 + assert grid.stage_to_global(stage_id=1) == 3 def test_primes(): diff --git a/tests/unit/test_dist.py b/tests/unit/test_dist.py deleted file mode 100644 index 6e6fabbfa..000000000 --- a/tests/unit/test_dist.py +++ /dev/null @@ -1,38 +0,0 @@ -import torch -import deepspeed.comm as dist - -from .common import distributed_test - -import pytest - - -@distributed_test(world_size=3) -def test_init(): - assert dist.is_initialized() - assert dist.get_world_size() == 3 - assert dist.get_rank() < 3 - - -# Demonstration of pytest's parameterization -@pytest.mark.parametrize('number,color', [(1138, 'purple')]) -def test_dist_args(number, color): - """Outer test function with inputs from pytest.mark.parametrize(). Uses a distributed - helper function. - """ - @distributed_test(world_size=2) - def _test_dist_args_helper(x, color='red'): - assert dist.get_world_size() == 2 - assert x == 1138 - assert color == 'purple' - - """Ensure that we can parse args to distributed_test decorated functions. """ - _test_dist_args_helper(number, color=color) - - -@distributed_test(world_size=[1, 2, 4]) -def test_dist_allreduce(): - x = torch.ones(1, 3).cuda() * (dist.get_rank() + 1) - sum_of_ranks = (dist.get_world_size() * (dist.get_world_size() + 1)) // 2 - result = torch.ones(1, 3).cuda() * sum_of_ranks - dist.all_reduce(x) - assert torch.all(x == result)