Fix for distributed tests on pytorch>=1.12 (#2141)

Fix for distributed tests
Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
This commit is contained in:
Michael Wyatt 2022-08-01 16:51:09 -07:00 коммит произвёл GitHub
Родитель b005db86fc
Коммит 1a71e77dc2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 360 добавлений и 248 удалений

7
.github/workflows/nv-inference.yml поставляемый
Просмотреть файл

@ -17,7 +17,7 @@ concurrency:
jobs:
unit-tests:
runs-on: [self-hosted, nvidia, cu111, v100]
runs-on: [self-hosted, nvidia, cu113, v100]
steps:
- uses: actions/checkout@v2
@ -31,7 +31,7 @@ jobs:
nvcc --version
pip install --upgrade pip
pip uninstall --yes torch torchvision
pip install torch==1.8.2+cu111 torchvision==0.9.2+cu111 -f https://download.pytorch.org/whl/lts/1.8/torch_lts.html
pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu113
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -60,4 +60,5 @@ jobs:
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'inference' unit/ --torch_ver="1.8" --cuda_ver="11.1"
EXPECTED_TORCH=$(pip index versions torch | grep -oP -m1 "^\s*LATEST.*\s\K\d+\.\d+")
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'inference' unit/ --torch_ver=$EXPECTED_TORCH --cuda_ver="11.3"

5
.github/workflows/nv-torch-latest-v100.yml поставляемый
Просмотреть файл

@ -60,5 +60,6 @@ jobs:
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -n 4 unit/
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'sequential' unit/
EXPECTED_TORCH=$(pip index versions torch | grep -oP -m1 "^\s*LATEST.*\s\K\d+\.\d+")
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -n 4 unit/ --torch_ver=$EXPECTED_TROCH --cuda_ver="11.3"
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m 'sequential' unit/ --torch_ver=$EXPECTED_TORCH --cuda_ver="11.3"

2
.github/workflows/nv-transformers-v100.yml поставляемый
Просмотреть файл

@ -62,4 +62,4 @@ jobs:
# force protobuf version due to issues
pip install "protobuf<4.21.0"
pip list
TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed
WANDB_DISABLED=true TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed

Просмотреть файл

@ -1,5 +1,6 @@
clang-format
docutils<0.18
future
importlib-metadata>=4
megatron-lm==1.1.5
pre-commit
@ -10,5 +11,7 @@ pytest-xdist
recommonmark
sphinx
sphinx-rtd-theme
tensorboard
torchvision
transformers
wandb

0
tests/__init__.py Normal file
Просмотреть файл

Просмотреть файл

@ -43,3 +43,14 @@ def check_environment(pytestconfig):
pytest.exit(
f"expected cuda version {expected_cuda_version} did not match found cuda version {torch.version.cuda}",
returncode=2)
# Override of pytest "runtest" for DistributedTest class
# This hook is run before the default pytest_runtest_call
@pytest.hookimpl(tryfirst=True)
def pytest_runtest_call(item):
# We want to use our own launching function for distributed tests
if getattr(item.cls, "is_dist_test", False):
dist_test_class = item.cls()
dist_test_class._run_test(item._request)
item.runtest = lambda: True # Dummy function so test is not run twice

Просмотреть файл

@ -0,0 +1,62 @@
import torch
import deepspeed.comm as dist
from tests.unit.common import DistributedTest
import pytest
class TestInit(DistributedTest):
world_size = 3
def test(self):
assert dist.is_initialized()
assert dist.get_world_size() == 3
assert dist.get_rank() < 3
# Demonstration of pytest's parameterization and fixtures
@pytest.fixture(params=["hello"])
def greeting(request):
return request.param
@pytest.mark.parametrize("number,color", [(1138, "purple")])
class TestDistArgs(DistributedTest):
world_size = 2
""" Classes that use DistributedTest class must define a test* method """
@pytest.mark.parametrize("shape", ["icosahedron"])
def test(self, number, color, shape, greeting):
"""Ensure that we can parse args to DistributedTest methods. """
assert dist.get_world_size() == 2
assert number == 1138
assert color == "purple"
assert shape == "icosahedron"
assert greeting == "hello"
# Demonstration of distributed tests grouped in single class
@pytest.mark.parametrize("number", [1138])
class TestGroupedDistTest(DistributedTest):
world_size = 2
def test_one(self, number):
assert dist.get_world_size() == 2
assert number == 1138
@pytest.mark.parametrize("color", ["purple"])
def test_two(self, number, color):
assert dist.get_world_size() == 2
assert number == 1138
assert color == "purple"
class TestDistAllReduce(DistributedTest):
world_size = [1, 2, 4]
def test(self):
x = torch.ones(1, 3).cuda() * (dist.get_rank() + 1)
sum_of_ranks = (dist.get_world_size() * (dist.get_world_size() + 1)) // 2
result = torch.ones(1, 3).cuda() * sum_of_ranks
dist.all_reduce(x)
assert torch.all(x == result)

Просмотреть файл

@ -1,15 +1,17 @@
import os
import time
import inspect
from abc import ABC
from pathlib import Path
import torch
import torch.multiprocessing as mp
import deepspeed
import deepspeed.comm as dist
from torch.multiprocessing import Process
import deepspeed
import pytest
from pathlib import Path
from _pytest.outcomes import Skipped
# Worker timeout *after* the first worker has completed.
DEEPSPEED_UNIT_WORKER_TIMEOUT = 120
@ -60,6 +62,108 @@ def set_cuda_visibile():
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list)
class DistributedTest(ABC):
is_dist_test = True
world_size = 2
backend = "nccl"
def _run_test(self, request):
self.current_test = self._get_current_test_func(request)
self.test_kwargs = self._get_test_kwargs(request)
if isinstance(self.world_size, int):
self.world_size = [self.world_size]
for procs in self.world_size:
self._launch_procs(procs)
time.sleep(0.5)
def _get_current_test_func(self, request):
# DistributedTest subclasses may have multiple test methods
func_name = request.function.__name__
return getattr(self, func_name)
def _get_test_kwargs(self, request):
# Grab fixture / parametrize kwargs from pytest request object
test_kwargs = {}
params = inspect.getfullargspec(self.current_test).args
params.remove("self")
for p in params:
test_kwargs[p] = request.getfixturevalue(p)
return test_kwargs
def _launch_procs(self, num_procs):
mp.set_start_method('forkserver', force=True)
skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason
processes = []
for local_rank in range(num_procs):
p = Process(target=self._dist_init, args=(local_rank, num_procs, skip_msg))
p.start()
processes.append(p)
# Now loop and wait for a test to complete. The spin-wait here isn't a big
# deal because the number of processes will be O(#GPUs) << O(#CPUs).
any_done = False
while not any_done:
for p in processes:
if not p.is_alive():
any_done = True
break
# Wait for all other processes to complete
for p in processes:
p.join(DEEPSPEED_UNIT_WORKER_TIMEOUT)
failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0]
for rank, p in failed:
# If it still hasn't terminated, kill it because it hung.
if p.exitcode is None:
p.terminate()
pytest.fail(f'Worker {rank} hung.', pytrace=False)
if p.exitcode < 0:
pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}',
pytrace=False)
if p.exitcode > 0:
pytest.fail(f'Worker {rank} exited with code {p.exitcode}',
pytrace=False)
if not skip_msg.empty():
# This assumed all skip messages are the same, it may be useful to
# add a check here to assert all exit messages are equal
pytest.skip(skip_msg.get())
def _dist_init(self, local_rank, num_procs, skip_msg):
"""Initialize deepspeed.comm and execute the user function. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = get_master_port()
os.environ['LOCAL_RANK'] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ['RANK'] = str(local_rank)
os.environ['WORLD_SIZE'] = str(num_procs)
# turn off NCCL logging if set
os.environ.pop('NCCL_DEBUG', None)
set_cuda_visibile()
deepspeed.init_distributed(dist_backend=self.backend)
dist.barrier()
if torch.cuda.is_available():
torch.cuda.set_device(local_rank)
try:
self.current_test(**self.test_kwargs)
except BaseException as e:
if isinstance(e, Skipped):
skip_msg.put(e.msg)
else:
raise e
# make sure all ranks finish at the same time
dist.barrier()
# tear down after test completes
dist.destroy_process_group()
def distributed_test(world_size=2, backend='nccl'):
"""A decorator for executing a function (e.g., a unit test) in a distributed manner.
This decorator manages the spawning and joining of processes, initialization of

Просмотреть файл

@ -5,13 +5,15 @@ import pytest
import itertools
import deepspeed
from deepspeed.git_version_info import torch_info
from .common import distributed_test
from tests.unit.common import DistributedTest
from packaging import version as pkg_version
from deepspeed.ops.op_builder import OpBuilder
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
from huggingface_hub import HfApi
# Fixture avoids problems with missing imports when pytest collects tests when
# running non-inference tests
@pytest.fixture(scope="module", autouse=True)
def lm_eval_imports():
global lm_eval
@ -159,29 +161,41 @@ def inf_kwargs(model_w_task):
return {}
def fill_mask_assert(x, y):
return set(res["token_str"] for res in x) == set(res["token_str"] for res in y)
def question_answering_assert(x, y):
return x["answer"] == y["answer"]
def text_classification_assert(x, y):
return set(res["label"] for res in x) == set(res["label"] for res in y)
def token_classification_assert(x, y):
return set(ent["word"] for ent in x) == set(ent["word"] for ent in y)
def text_generation_assert(x, y):
return set(res["generated_text"] for res in x) == set(res["generated_text"]
for res in y)
@pytest.fixture
def assert_fn(model_w_task):
model, task = model_w_task
if task == "fill-mask":
return lambda x, y: set(res["token_str"] for res in x) == set(
res["token_str"] for res in y
)
elif task == "question-answering":
return lambda x, y: x["answer"] == y["answer"]
elif task == "text-classification":
return lambda x, y: set(res["label"] for res in x) == set(
res["label"] for res in y
)
elif task == "token-classification":
return lambda x, y: set(ent["word"] for ent in x) == set(
ent["word"] for ent in y
)
elif task == "text-generation":
return lambda x, y: set(res["generated_text"] for res in x) == set(
res["generated_text"] for res in y
)
else:
assert_fn_dict = {
"fill-mask": fill_mask_assert,
"question-answering": question_answering_assert,
"text-classification": text_classification_assert,
"token-classification": token_classification_assert,
"text-generation": text_generation_assert,
}
assert_fn = assert_fn_dict.get(task, None)
if assert_fn is None:
NotImplementedError(f'assert_fn for task "{task}" is not implemented')
return assert_fn
"""
@ -190,22 +204,23 @@ Tests
@pytest.mark.inference
def test_model_task(
model_w_task,
dtype,
enable_cuda_graph,
query,
inf_kwargs,
assert_fn,
invalid_model_task_config,
):
if invalid_model_task_config:
pytest.skip(invalid_model_task_config)
class TestModelTask(DistributedTest):
world_size = 1
model, task = model_w_task
def test(
self,
model_w_task,
dtype,
enable_cuda_graph,
query,
inf_kwargs,
assert_fn,
invalid_model_task_config,
):
if invalid_model_task_config:
pytest.skip(invalid_model_task_config)
@distributed_test(world_size=[1])
def _go():
model, task = model_w_task
local_rank = int(os.getenv("LOCAL_RANK", "0"))
if "gpt-j-6B" in model and dtype == torch.half:
@ -225,8 +240,8 @@ def test_model_task(
pipe.model.half()
# Warm-up queries for perf measurement
for i in range(10):
_ = pipe(query, **inf_kwargs)
#for i in range(10):
# _ = pipe(query, **inf_kwargs)
torch.cuda.synchronize()
start = time.time()
bs_output = pipe(query, **inf_kwargs)
@ -242,8 +257,8 @@ def test_model_task(
enable_cuda_graph=enable_cuda_graph,
)
# Warm-up queries for perf measurement
for i in range(10):
_ = pipe(query, **inf_kwargs)
#for i in range(10):
# _ = pipe(query, **inf_kwargs)
torch.cuda.synchronize()
start = time.time()
ds_output = pipe(query, **inf_kwargs)
@ -258,8 +273,6 @@ def test_model_task(
#assert ds_time <= (bs_time * 1.1)
assert assert_fn(bs_output, ds_output)
_go()
@pytest.mark.nightly
@pytest.mark.parametrize(
@ -274,9 +287,10 @@ def test_model_task(
),
)
@pytest.mark.parametrize("task", ["lambada"])
def test_lm_correctness(model_family, model_name, task):
@distributed_test(world_size=[1])
def _go():
class TestLMCorrectness(DistributedTest):
world_size = 1
def test(self, model_family, model_name, task):
local_rank = os.getenv("LOCAL_RANK", "0")
device = torch.device(f"cuda:{local_rank}")
dtype = torch.float
@ -320,5 +334,3 @@ def test_lm_correctness(model_family, model_name, task):
ds_output["results"][task]["ppl"])
#assert ds_time <= bs_time
assert ppl_diff < 0.01
_go()

Просмотреть файл

@ -1,36 +1,17 @@
import pytest
from deepspeed.monitor.constants import *
from deepspeed.monitor.tensorboard import TensorBoardMonitor
from deepspeed.monitor.wandb import WandbMonitor
from deepspeed.monitor.csv_monitor import csvMonitor
from .simple_model import *
from .common import distributed_test
from tests.unit.common import DistributedTest
from deepspeed.runtime.config import DeepSpeedConfig
try:
import tensorboard # noqa: F401
_tb_available = True
except ImportError:
_tb_available = False
tb_available = pytest.mark.skipif(not _tb_available,
reason="tensorboard is not installed")
try:
import wandb # noqa: F401
_wandb_available = True
except ImportError:
_wandb_available = False
wandb_available = pytest.mark.skipif(not _wandb_available,
reason="wandb is not installed")
class TestTensorBoard(DistributedTest):
world_size = 2
@tb_available
def test_tensorboard(tmpdir):
@distributed_test(world_size=2)
def _test_tensorboard():
def test_tensorboard(self):
config_dict = {
"train_batch_size": 2,
"tensorboard": {
@ -45,13 +26,7 @@ def test_tensorboard(tmpdir):
assert tb_monitor.output_path == "test_output/ds_logs/"
assert tb_monitor.job_name == "test"
_test_tensorboard()
@tb_available
def test_empty_tensorboard(tmpdir):
@distributed_test(world_size=2)
def _test_empty_tensorboard():
def test_empty_tensorboard(self):
config_dict = {"train_batch_size": 2, "tensorboard": {}}
ds_config = DeepSpeedConfig(config_dict)
tb_monitor = TensorBoardMonitor(ds_config.monitor_config)
@ -59,13 +34,11 @@ def test_empty_tensorboard(tmpdir):
assert tb_monitor.output_path == TENSORBOARD_OUTPUT_PATH_DEFAULT
assert tb_monitor.job_name == TENSORBOARD_JOB_NAME_DEFAULT
_test_empty_tensorboard()
class TestWandB(DistributedTest):
world_size = 2
@wandb_available
def test_wandb(tmpdir):
@distributed_test(world_size=2)
def _test_wandb():
def test_wandb(self):
config_dict = {
"train_batch_size": 2,
"wandb": {
@ -82,13 +55,7 @@ def test_wandb(tmpdir):
assert wandb_monitor.team == "my_team"
assert wandb_monitor.project == "my_project"
_test_wandb()
@wandb_available
def test_empty_wandb(tmpdir):
@distributed_test(world_size=2)
def _test_empty_wandb():
def test_empty_wandb(self):
config_dict = {"train_batch_size": 2, "wandb": {}}
ds_config = DeepSpeedConfig(config_dict)
wandb_monitor = WandbMonitor(ds_config.monitor_config)
@ -97,12 +64,11 @@ def test_empty_wandb(tmpdir):
assert wandb_monitor.team == WANDB_TEAM_NAME_DEFAULT
assert wandb_monitor.project == WANDB_PROJECT_NAME_DEFAULT
_test_empty_wandb()
class TestCSVMonitor(DistributedTest):
world_size = 2
def test_csv_monitor(tmpdir):
@distributed_test(world_size=2)
def _test_csv_monitor():
def test_csv_monitor(self):
config_dict = {
"train_batch_size": 2,
"csv_monitor": {
@ -117,12 +83,7 @@ def test_csv_monitor(tmpdir):
assert csv_monitor.output_path == "test_output/ds_logs/"
assert csv_monitor.job_name == "test"
_test_csv_monitor()
def test_empty_csv_monitor(tmpdir):
@distributed_test(world_size=2)
def _test_empty_csv_monitor():
def test_empty_csv_monitor(self):
config_dict = {"train_batch_size": 2, "csv_monitor": {}}
ds_config = DeepSpeedConfig(config_dict)
csv_monitor = csvMonitor(ds_config.monitor_config)

Просмотреть файл

@ -4,8 +4,8 @@ import pytest
from deepspeed.ops.adam import FusedAdam
from deepspeed.ops.adam import DeepSpeedCPUAdam
from .common import distributed_test
from .simple_model import SimpleModel, args_from_dict
from tests.unit.common import DistributedTest
from tests.unit.simple_model import SimpleModel
# yapf: disable
#'optimizer, zero_offload, torch_adam, adam_w_mode, resulting_optimizer
@ -29,38 +29,37 @@ adam_configs = [["AdamW", False, False, False, (FusedAdam, True)],
@pytest.mark.parametrize(
'optimizer, zero_offload, torch_adam, adam_w_mode, resulting_optimizer',
adam_configs)
def test_adam_configs(tmpdir,
optimizer,
zero_offload,
torch_adam,
adam_w_mode,
resulting_optimizer):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": optimizer,
"params": {
"lr": 0.00015,
"torch_adam": torch_adam,
"adam_w_mode": adam_w_mode
}
},
"gradient_clipping": 1.0,
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": 2,
"cpu_offload": zero_offload
}
}
args = args_from_dict(tmpdir, config_dict)
class TestAdamConfigs(DistributedTest):
world_size = 1
@distributed_test(world_size=[1])
def helper(args):
def test(self,
optimizer,
zero_offload,
torch_adam,
adam_w_mode,
resulting_optimizer):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": optimizer,
"params": {
"lr": 0.00015,
"torch_adam": torch_adam,
"adam_w_mode": adam_w_mode
}
},
"gradient_clipping": 1.0,
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": 2,
"cpu_offload": zero_offload
}
}
model = SimpleModel(10)
model, _, _, _ = deepspeed.initialize(args=args,
model, _, _, _ = deepspeed.initialize(config=config_dict,
model=model,
model_parameters=model.parameters())
# get base optimizer under zero
@ -69,5 +68,3 @@ def test_adam_configs(tmpdir,
assert isinstance(ds_optimizer, opt_class)
if adam_w_mode in [True, False]:
assert ds_optimizer.adam_w_mode == adam_w_mode
helper(args)

Просмотреть файл

Просмотреть файл

@ -2,8 +2,8 @@ import torch
import pytest
import deepspeed
from deepspeed.profiling.flops_profiler import get_model_profile
from .simple_model import SimpleModel, random_dataloader, args_from_dict
from .common import distributed_test
from tests.unit.simple_model import SimpleModel, random_dataloader
from tests.unit.common import DistributedTest
TORCH_MAJOR = int(torch.__version__.split('.')[0])
TORCH_MINOR = int(torch.__version__.split('.')[1])
@ -19,36 +19,36 @@ def within_range(val, target, tolerance):
TOLERANCE = 0.05
def test_flops_profiler_in_ds_training(tmpdir):
config_dict = {
"train_batch_size": 1,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
}
},
"zero_optimization": {
"stage": 0
},
"fp16": {
"enabled": True,
},
"flops_profiler": {
"enabled": True,
"step": 1,
"module_depth": -1,
"top_modules": 3,
},
}
args = args_from_dict(tmpdir, config_dict)
hidden_dim = 10
model = SimpleModel(hidden_dim, empty_grad=False)
class TestFlopsProfilerInDSTraining(DistributedTest):
world_size = 1
@distributed_test(world_size=[1])
def _test_flops_profiler_in_ds_training(args, model, hidden_dim):
model, _, _, _ = deepspeed.initialize(args=args,
def test(self):
config_dict = {
"train_batch_size": 1,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
}
},
"zero_optimization": {
"stage": 0
},
"fp16": {
"enabled": True,
},
"flops_profiler": {
"enabled": True,
"step": 1,
"module_depth": -1,
"top_modules": 3,
},
}
hidden_dim = 10
model = SimpleModel(hidden_dim, empty_grad=False)
model, _, _, _ = deepspeed.initialize(config=config_dict,
model=model,
model_parameters=model.parameters())
@ -65,8 +65,6 @@ def test_flops_profiler_in_ds_training(tmpdir):
assert within_range(model.flops_profiler.flops, 200, tolerance=TOLERANCE)
assert model.flops_profiler.params == 110
_test_flops_profiler_in_ds_training(args, model, hidden_dim)
class LeNet5(torch.nn.Module):
def __init__(self, n_classes):

Просмотреть файл

@ -7,7 +7,7 @@ from deepspeed.runtime.pipe.topology import PipelineParallelGrid as Grid
from deepspeed.runtime.pipe.topology import ProcessTopology as Topo
from deepspeed.runtime.pipe.topology import _prime_factors
from .common import distributed_test
from tests.unit.common import DistributedTest
def test_topology_2d():
@ -157,51 +157,51 @@ def test_topology_comm_list():
assert topo.get_axis_comm_lists('jeff') == []
@distributed_test(world_size=4)
def test_grid_pipe_data():
topo = Topo(axes=['pipe', 'data'], dims=[2, 2])
grid = Grid(topology=topo)
class TestDistributedTopology(DistributedTest):
world_size = 4
assert grid._is_grid_valid()
def test_grid_pipe_data(self):
topo = Topo(axes=['pipe', 'data'], dims=[2, 2])
grid = Grid(topology=topo)
rank = dist.get_rank()
assert grid._is_grid_valid()
assert grid.is_first_stage == (grid.get_stage_id() == 0)
assert grid.is_last_stage == (
grid.get_stage_id() == grid.get_pipe_parallel_world_size() - 1)
rank = dist.get_rank()
# Test collectives along the pipeline parallel process groups
rank_tensor = torch.LongTensor(data=[rank]).cuda()
dist.all_reduce(rank_tensor, group=grid.get_pipe_parallel_group())
pipe_group = grid.pp_group
assert torch.all(rank_tensor == sum(pipe_group))
assert grid.is_first_stage == (grid.get_stage_id() == 0)
assert grid.is_last_stage == (
grid.get_stage_id() == grid.get_pipe_parallel_world_size() - 1)
# Test collectives along the data parallel process groups
rank_tensor = torch.LongTensor(data=[rank]).cuda()
dist.all_reduce(rank_tensor, group=grid.get_data_parallel_group())
data_group = grid.dp_group
assert torch.all(rank_tensor == sum(data_group))
# Test collectives along the pipeline parallel process groups
rank_tensor = torch.LongTensor(data=[rank]).cuda()
dist.all_reduce(rank_tensor, group=grid.get_pipe_parallel_group())
pipe_group = grid.pp_group
assert torch.all(rank_tensor == sum(pipe_group))
# Test collectives along the data parallel process groups
rank_tensor = torch.LongTensor(data=[rank]).cuda()
dist.all_reduce(rank_tensor, group=grid.get_data_parallel_group())
data_group = grid.dp_group
assert torch.all(rank_tensor == sum(data_group))
@distributed_test(world_size=4)
def test_stage_to_global():
topo = Topo(axes=['pipe', 'data'], dims=[2, 2])
grid = Grid(topology=topo)
def test_stage_to_global(self):
topo = Topo(axes=['pipe', 'data'], dims=[2, 2])
grid = Grid(topology=topo)
assert grid._is_grid_valid()
assert grid._is_grid_valid()
assert grid.stage_to_global(stage_id=0, data=0) == 0
assert grid.stage_to_global(stage_id=0, data=1) == 1
assert grid.stage_to_global(stage_id=1, data=0) == 2
assert grid.stage_to_global(stage_id=1, data=1) == 3
assert grid.stage_to_global(stage_id=0, data=0) == 0
assert grid.stage_to_global(stage_id=0, data=1) == 1
assert grid.stage_to_global(stage_id=1, data=0) == 2
assert grid.stage_to_global(stage_id=1, data=1) == 3
me = topo.get_coord(rank=dist.get_rank())
if me.data == 0:
assert grid.stage_to_global(stage_id=0) == 0
assert grid.stage_to_global(stage_id=1) == 2
else:
assert grid.stage_to_global(stage_id=0) == 1
assert grid.stage_to_global(stage_id=1) == 3
me = topo.get_coord(rank=dist.get_rank())
if me.data == 0:
assert grid.stage_to_global(stage_id=0) == 0
assert grid.stage_to_global(stage_id=1) == 2
else:
assert grid.stage_to_global(stage_id=0) == 1
assert grid.stage_to_global(stage_id=1) == 3
def test_primes():

Просмотреть файл

@ -1,38 +0,0 @@
import torch
import deepspeed.comm as dist
from .common import distributed_test
import pytest
@distributed_test(world_size=3)
def test_init():
assert dist.is_initialized()
assert dist.get_world_size() == 3
assert dist.get_rank() < 3
# Demonstration of pytest's parameterization
@pytest.mark.parametrize('number,color', [(1138, 'purple')])
def test_dist_args(number, color):
"""Outer test function with inputs from pytest.mark.parametrize(). Uses a distributed
helper function.
"""
@distributed_test(world_size=2)
def _test_dist_args_helper(x, color='red'):
assert dist.get_world_size() == 2
assert x == 1138
assert color == 'purple'
"""Ensure that we can parse args to distributed_test decorated functions. """
_test_dist_args_helper(number, color=color)
@distributed_test(world_size=[1, 2, 4])
def test_dist_allreduce():
x = torch.ones(1, 3).cuda() * (dist.get_rank() + 1)
sum_of_ranks = (dist.get_world_size() * (dist.get_world_size() + 1)) // 2
result = torch.ones(1, 3).cuda() * sum_of_ranks
dist.all_reduce(x)
assert torch.all(x == result)