* Integrate accelerator abstraction interface into deepspeed/

* Fix error message in fp16/fused_optimizer

* fix error message in fp16/unfused_optimizer.py

* assign get_accelerator().pin_memory() result to input Tensor name

* no need to check cuda and whether nvtx supported

* move try-except into inner most block

* call Event() and Stream() in get_accelerator() for data type

* Make Stream and Event as properties of abstract interface so they can be used as data type in deepspeed

* Apply op_builder backend api change from #2705 from @jeffra

* fix tests where Builder NAME is used

* keep original ...Builder.NAME interface instead of ...Builder().NAME interface

* fix builder closure for installation

* fix randomltd builder

* add comments to clarify create_op_builder and get_op_builder

* fix compatibility with pip install -e

Co-authored-by: Cheng Li <pistasable@gmail.com>
Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
This commit is contained in:
Ma, Guokai 2023-01-26 22:03:12 +08:00 коммит произвёл GitHub
Родитель ddd48b36ac
Коммит 98cc35b6a8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
59 изменённых файлов: 538 добавлений и 486 удалений

Просмотреть файл

@ -66,12 +66,9 @@ class DeepSpeedAccelerator(ABC):
...
# Streams/Events
@property
@abc.abstractmethod
def Stream(self, device=None, priority=0, **kwargs):
...
@abc.abstractmethod
def StreamContext(self, stream):
def Stream(self):
...
@abc.abstractmethod
@ -86,8 +83,9 @@ class DeepSpeedAccelerator(ABC):
def default_stream(self, device_index=None):
...
@property
@abc.abstractmethod
def Event(self, **kwargs):
def Event(self):
...
# Memory management
@ -221,10 +219,16 @@ class DeepSpeedAccelerator(ABC):
def op_builder_dir(self):
...
# create an instance of op builder, specified by class_name
@abc.abstractmethod
def create_op_builder(self, class_name):
...
# return an op builder class, specified by class_name
@abc.abstractmethod
def get_op_builder(self, class_name):
...
@abc.abstractmethod
def build_extension(self):
...

Просмотреть файл

@ -23,7 +23,7 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder' and module_name != 'builder_names':
if module_name != 'all_ops' and module_name != 'builder':
module = importlib.import_module("{}.{}".format(
op_builder_dir,
module_name))
@ -88,11 +88,9 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
return torch.cuda.default_generators[device_index]
# Streams/Events
def Stream(self, device=None, priority=0, **kwargs):
return torch.cuda.Stream(device, priority, **kwargs)
def StreamContext(self, stream):
return torch.cuda.StreamContext(stream)
@property
def Stream(self):
return torch.cuda.Stream
def stream(self, stream):
return torch.cuda.stream(stream)
@ -103,8 +101,9 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
def default_stream(self, device_index=None):
return torch.cuda.default_stream(device_index)
def Event(self, **kwargs):
return torch.cuda.Event(**kwargs)
@property
def Event(self):
return torch.cuda.Event
# Memory management
def empty_cache(self):
@ -234,12 +233,20 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
# this dict will be filled at init stage
class_dict = {}
# create an instance of op builder and return, name specified by class_name
def create_op_builder(self, class_name):
if class_name in self.class_dict:
return self.class_dict[class_name]()
else:
return None
# return an op builder class, name specified by class_name
def get_op_builder(self, class_name):
if class_name in self.class_dict:
return self.class_dict[class_name]
else:
return None
def build_extension(self):
from torch.utils.cpp_extension import BuildExtension
return BuildExtension

Просмотреть файл

@ -15,8 +15,7 @@ import shutil
from test_ds_aio_utils import refine_integer_value
from perf_sweep_utils import READ_OP_DESC, WRITE_OP_DESC, BENCH_LOG_DIR, \
READ_IO_DIR, WRITE_IO_DIR, READ_LOG_DIR, WRITE_LOG_DIR
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder
from deepspeed.ops.op_builder import AsyncIOBuilder
OTHER_OPTIONS = '--handle'
PERF_SCRIPT = 'test_ds_aio.py'
@ -279,7 +278,7 @@ def script_path():
def async_io_setup():
return get_accelerator().create_op_builder(AsyncIOBuilder).is_compatible()
return AsyncIOBuilder().is_compatible()
def get_block_size_and_count(io_bytes):

Просмотреть файл

@ -11,7 +11,7 @@ import time
from multiprocessing import Pool, Barrier
from test_ds_aio_utils import report_results, task_log, task_barrier
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder
from deepspeed.ops.op_builder import AsyncIOBuilder
def pre_basic(args, tid, read_op):
@ -60,14 +60,13 @@ def post_basic(pool_params):
def main_basic_read(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
get_accelerator().create_op_builder(AsyncIOBuilder).load().aio_read(
ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
AsyncIOBuilder().load().aio_read(ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time
@ -77,14 +76,13 @@ def main_basic_read(pool_params):
def main_basic_write(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
get_accelerator().create_op_builder(AsyncIOBuilder).load().aio_write(
ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
AsyncIOBuilder().load().aio_write(ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time

Просмотреть файл

@ -11,7 +11,7 @@ import time
from multiprocessing import Pool, Barrier
from test_ds_aio_utils import report_results, task_log, task_barrier
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder
from deepspeed.ops.op_builder import AsyncIOBuilder
def pre_handle(args, tid, read_op):
@ -35,12 +35,11 @@ def pre_handle(args, tid, read_op):
)
io_parallel = args.io_parallel if args.io_parallel else 1
handle = get_accelerator().create_op_builder(AsyncIOBuilder).load().aio_handle(
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
io_parallel)
handle = AsyncIOBuilder().load().aio_handle(args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
io_parallel)
task_log(tid, f'created deepspeed aio handle')
ctxt = {}

Просмотреть файл

@ -4,6 +4,5 @@ Licensed under the MIT license.
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder
assert get_accelerator().create_op_builder(AsyncIOBuilder).is_compatible()
from deepspeed.ops.op_builder import AsyncIOBuilder
assert AsyncIOBuilder().is_compatible()

Просмотреть файл

@ -1,6 +1,5 @@
import shutil
import subprocess
import torch
import time
import datetime
import math
@ -16,6 +15,7 @@ from .constants import *
from .scheduler import ResourceManager
from .tuner import GridSearchTuner, RandomTuner, ModelBasedTuner
from .utils import *
from deepspeed.accelerator import get_accelerator
try:
from tabulate import tabulate
@ -279,7 +279,7 @@ class Autotuner:
return False
def get_gpu_memory_info(self):
return torch.cuda.get_device_properties(0).total_memory
return get_accelerator().total_memory()
def get_activation_memory_per_gpu(self):
if self.model_info and "activation_mem_per_gpu" in self.model_info:

Просмотреть файл

@ -20,6 +20,7 @@ from ..comm.comm import init_distributed
from ..pipe import PipelineModule
from ..moe.utils import has_moe_layers
from ..module_inject import LinearAllreduce, LinearLayer, Normalize, ReplaceWithTensorSlicing
from deepspeed.accelerator import get_accelerator
from ..module_inject.policy import TransformerPolicy
from ..module_inject.auto_tp import AutoTP
@ -85,7 +86,7 @@ class InferenceEngine(Module):
# This is a hack to remove the prepare_mask function on HF side for BLOOM architecture
self.remove_mask_prepare_for_bloom()
if config.enable_cuda_graph:
if get_accelerator().device_name() == 'cuda' and config.enable_cuda_graph:
assert pkg_version.parse(torch.__version__) >= pkg_version.parse("1.10"), \
"If you want to use cuda graph, please upgrade torch to at least v1.10"
@ -135,13 +136,14 @@ class InferenceEngine(Module):
config.injection_policy_tuple = injection_policy
self._apply_injection_policy(config, client_module)
device = torch.cuda.current_device()
device = get_accelerator().current_device_name()
self.module.to(device)
if config.tensor_parallel.tp_size > 1:
_rng_state = torch.cuda.get_rng_state().to(torch.cuda.current_device())
_rng_state = get_accelerator().get_rng_state().to(
get_accelerator().current_device_name())
dist.broadcast(_rng_state, 0)
torch.cuda.set_rng_state(_rng_state.cpu())
get_accelerator().set_rng_state(_rng_state.cpu())
if config.tensor_parallel.tp_size > 1:
assert not config.enable_cuda_graph, "Cuda graph is not supported for model parallelism"
@ -171,7 +173,7 @@ class InferenceEngine(Module):
if self.use_cuda_events:
self.timers(INFERENCE_MODEL_TIMER).start()
else:
torch.cuda.synchronize()
get_accelerator().synchronize()
self._start = time.time()
def _post_forward_hook(self, module, input, output):
@ -179,7 +181,7 @@ class InferenceEngine(Module):
self.timers(INFERENCE_MODEL_TIMER).stop()
elapsed_time = self.timers(INFERENCE_MODEL_TIMER).elapsed(reset=True)
else:
torch.cuda.synchronize()
get_accelerator().synchronize()
self._end = time.time()
elapsed_time = self._end - self._start
self._model_times.append(elapsed_time)
@ -189,7 +191,7 @@ class InferenceEngine(Module):
if InferenceEngine.inference_mp_group is None:
init_distributed()
local_rank = int(os.getenv('LOCAL_RANK', '0'))
torch.cuda.set_device(local_rank)
get_accelerator().set_device(local_rank)
ranks = [i for i in range(config.tensor_parallel.tp_size)]
self.mp_group = dist.new_group(ranks)
@ -300,7 +302,7 @@ class InferenceEngine(Module):
state_dict[prefix + 'bias'])
else:
data = state_dict[prefix + 'bias']
data = data.to(torch.cuda.current_device())
data = data.to(get_accelerator().current_device_name())
module.bias = self.mp_replace.copy(module.bias, data)
layer_policies = {
@ -402,7 +404,8 @@ class InferenceEngine(Module):
for i in range(1, len(sd_loader)):
if not dist.is_initialized() or dist.get_rank() == 0:
print(f"loading checkpoint ({i})")
self.sd = torch.load(sd_loader[i], map_location='cuda')
self.sd = torch.load(sd_loader[i],
map_location=get_accelerator().device_name())
self.key_list = list(self.sd.keys())
self.load_model_with_checkpoint(self.module)
else:
@ -463,12 +466,12 @@ class InferenceEngine(Module):
def _create_cuda_graph(self, *inputs, **kwargs):
# warmup to create the workspace and cublas handle
cuda_stream = torch.cuda.Stream()
cuda_stream.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(cuda_stream):
cuda_stream = get_accelerator().Stream()
cuda_stream.wait_stream(get_accelerator().current_stream())
with get_accelerator().stream(cuda_stream):
for i in range(3):
ret = self.module(*inputs, **kwargs)
torch.cuda.current_stream().wait_stream(cuda_stream)
get_accelerator().current_stream().wait_stream(cuda_stream)
# create cuda_graph and assign static_inputs and static_outputs
self._cuda_graphs = torch.cuda.CUDAGraph()
@ -510,11 +513,12 @@ class InferenceEngine(Module):
**kwargs: variable length keyword arguments
"""
start = None
if self.model_profile_enabled and self._config.enable_cuda_graph:
torch.cuda.synchronize()
if self.model_profile_enabled and get_accelerator().device_name(
) == 'cuda' and self._config.enable_cuda_graph:
get_accelerator().synchronize()
start = time.time()
if self._config.enable_cuda_graph:
if get_accelerator().device_name() == 'cuda' and self._config.enable_cuda_graph:
if self.cuda_graph_created:
outputs = self._graph_replay(*inputs, **kwargs)
else:
@ -524,7 +528,7 @@ class InferenceEngine(Module):
outputs = self.module(*inputs, **kwargs)
if self.model_profile_enabled and self._config.enable_cuda_graph:
torch.cuda.synchronize()
get_accelerator().synchronize()
duration = time.time() - start
self._model_times.append(duration)

Просмотреть файл

@ -5,7 +5,7 @@ import subprocess
import warnings
from shlex import split
from abc import ABC, abstractmethod
from deepspeed.accelerator import get_accelerator
from ..utils import logger
from .constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE
@ -220,7 +220,8 @@ class MVAPICHRunner(MultiNodeRunner):
self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1')
# Enabled cuda-aware communication
self.add_export('MV2_USE_CUDA', '1')
if get_accelerator().device_name() == 'cuda':
self.add_export('MV2_USE_CUDA', '1')
# Support deep learning frameworks: http://hidl.cse.ohio-state.edu/userguide/horovod/
self.add_export('MV2_SUPPORT_DL', '1')

Просмотреть файл

@ -17,7 +17,6 @@ import collections
from copy import deepcopy
import signal
import time
import torch.cuda
from .multinode_runner import PDSHRunner, OpenMPIRunner, MVAPICHRunner, SlurmRunner
from .constants import PDSH_LAUNCHER, OPENMPI_LAUNCHER, MVAPICH_LAUNCHER, SLURM_LAUNCHER
@ -26,6 +25,7 @@ from ..nebula.constants import NEBULA_EXPORT_ENVS
from ..utils import logger
from ..autotuning import Autotuner
from deepspeed.accelerator import get_accelerator
DLTS_HOSTFILE = "/job/hostfile"
EXPORT_ENVS = ['MLFLOW', 'NCCL', 'PYTHON', 'MV2', 'UCX']
@ -406,7 +406,7 @@ def main(args=None):
multi_node_exec = True
if not resource_pool:
resource_pool = {}
device_count = torch.cuda.device_count()
device_count = get_accelerator().device_count()
if device_count == 0:
raise RuntimeError("Unable to proceed, no GPU resources available")
resource_pool['localhost'] = device_count

Просмотреть файл

@ -2,6 +2,7 @@
Copyright 2022 The Microsoft DeepSpeed Team
'''
import torch
from deepspeed.accelerator import get_accelerator
class DSClipEncoder(torch.nn.Module):
@ -25,7 +26,7 @@ class DSClipEncoder(torch.nn.Module):
seq_len,
seq_len,
dtype=dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
mask.fill_(torch.tensor(torch.finfo(dtype).min))
mask.triu_(1)
mask = mask.unsqueeze(1)

Просмотреть файл

@ -3,13 +3,14 @@ Copyright 2022 The Microsoft DeepSpeed Team
'''
import torch
from deepspeed.ops import op_builder
import torch.nn as nn
from deepspeed import comm as dist
from deepspeed.utils.logging import log_dist
from deepspeed.ops.transformer.inference.ds_mlp import DeepSpeedMLP
from deepspeed.ops.transformer.inference.ds_attention import DeepSpeedSelfAttention, BloomSelfAttention
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import InferenceBuilder
inference_cuda_module = None
@ -49,7 +50,7 @@ class DeepSpeedTransformerInference(nn.Module):
data_type = torch.half if config.fp16 else torch.float
global inference_cuda_module
if inference_cuda_module is None:
builder = op_builder.InferenceBuilder()
builder = InferenceBuilder()
inference_cuda_module = builder.load()
if DeepSpeedTransformerInference.layer_id == 1:
@ -76,7 +77,8 @@ class DeepSpeedTransformerInference(nn.Module):
merge_count,
mlp_extra_grouping)
device = torch.cuda.current_device() #if config.bigscience_bloom else 'cpu'
device = get_accelerator().current_device_name(
) # if config.bigscience_bloom else 'cpu'
self.norm_w = nn.Parameter(torch.empty(self.config.hidden_size,
dtype=data_type,
device=device),

Просмотреть файл

@ -4,6 +4,7 @@ from torch import nn
from torch.nn import functional as F
from torch.nn.parameter import Parameter
from deepspeed.accelerator import get_accelerator
class LinearAllreduce(nn.Module):
@ -32,12 +33,12 @@ class LinearLayer(nn.Module):
self.weight = Parameter(
torch.empty(weight_shape,
dtype=dtype,
device=torch.cuda.current_device()))
device=get_accelerator().current_device_name()))
self.bias = Parameter(
torch.empty(weight_shape[0],
dtype=dtype,
device=torch.cuda.current_device())) \
device=get_accelerator().current_device_name())) \
if bias is not None else None
def forward(self, input):
@ -50,7 +51,9 @@ class LinearLayer(nn.Module):
class Normalize(nn.Module):
def __init__(self, dim, dtype=torch.float, eps=1e-5):
super(Normalize, self).__init__()
self.norm = nn.LayerNorm(dim, eps=eps).to(dtype).to(torch.cuda.current_device())
self.norm = nn.LayerNorm(dim,
eps=eps).to(dtype).to(
get_accelerator().current_device_name())
self.weight = self.norm.weight
self.bias = self.norm.bias
@ -65,7 +68,7 @@ class EmbeddingLayer(nn.Module):
torch.empty(weight_shape[0],
weight_shape[1],
dtype=dtype,
device=torch.cuda.current_device()))
device=get_accelerator().current_device_name()))
def forward(self, input):
return F.embedding(input, self.weight)

Просмотреть файл

@ -9,6 +9,7 @@ import deepspeed.ops.transformer as transformer_inference
from .layers import LinearLayer, Normalize, EmbeddingLayer, OPTEmbedding
import torch
import gc
from deepspeed.accelerator import get_accelerator
def load_model_with_checkpoint(r_module,
@ -49,9 +50,10 @@ def load_model_with_checkpoint(r_module,
if type(sd[0][prefix + n]) is list:
tmp_data, scale = sd[0][prefix + n]
tmp_data = tmp_data
scale = scale.to(torch.cuda.current_device())
scale = scale.to(get_accelerator().current_device_name())
else:
tmp_data = sd[0][prefix + n].to(torch.cuda.current_device())
tmp_data = sd[0][prefix + n].to(
get_accelerator().current_device_name())
scale = None
src_shape = tmp_data.shape
dst_shape = p.shape
@ -77,7 +79,8 @@ def load_model_with_checkpoint(r_module,
weight_partition = torch.split(
tmp_data,
dst_shape[dim1],
dim=dim)[rank].to(torch.cuda.current_device())
dim=dim)[rank].to(
get_accelerator().current_device_name())
assert tmp_data.dtype != torch.int8 or scale.numel() > weight_quantizer.num_groups * (rank+1), \
'''ERROR: We require the quantization scales for larger TP-size when loading INT8 checkpoint!\
Please use the FP16 checkpoint to generate INT8 checkpoint with the sharding parameters!'''
@ -93,17 +96,19 @@ def load_model_with_checkpoint(r_module,
all_data = [
sd[j][prefix +
n] if type(sd[j][prefix + n]) is list else
sd[j][prefix + n].to(torch.cuda.current_device())
sd[j][prefix + n].to(
get_accelerator().current_device_name())
for j in range(len(sd))
]
weight_partition = torch.cat([
ad[0].to(torch.cuda.current_device())
ad[0].to(get_accelerator().current_device_name())
if type(ad) is list else ad for ad in all_data
],
dim=dim)
if tmp_data.dtype == torch.int8:
scale = torch.cat([
ad[1].to(torch.cuda.current_device())
ad[1].to(
get_accelerator().current_device_name())
for ad in all_data
],
dim=dim)
@ -126,15 +131,15 @@ def load_model_with_checkpoint(r_module,
if src_shape[0] > dst_shape[0]:
bias_split = torch.split(
tmp_data,
dst_shape[-1])[rank].to(
torch.cuda.current_device()).contiguous()
dst_shape[-1])[rank].to(get_accelerator(
).current_device_name()).contiguous()
p.data.copy_(bias_split)
else:
p.data.copy_(
torch.cat(
[sd[j][prefix + n] for j in range(len(sd))],
dim=0).to(torch.cuda.current_device()).
contiguous())
dim=0).to(get_accelerator(
).current_device_name()).contiguous())
load_parameters(module, prefix)
for n, child in module.named_children():

Просмотреть файл

@ -6,6 +6,7 @@ import deepspeed.ops.transformer as transformer_inference
from deepspeed.ops.transformer.inference.diffusers_attention import DeepSpeedDiffusersAttention
from deepspeed.ops.transformer.inference.diffusers_transformer_block import DeepSpeedDiffusersTransformerBlock
from deepspeed.ops.transformer.inference.diffusers_2d_transformer import Diffusers2DTransformerConfig
from deepspeed.accelerator import get_accelerator
from .replace_policy import HFGPT2LayerPolicy
from .replace_policy import replace_policies, generic_policies
@ -63,10 +64,10 @@ class ReplaceWithTensorSlicing:
axis=self.out_dim) for i in range(len(qkv_split[0]))
]
dst.data.copy_(weight_split[self.gpu_index].to(
torch.cuda.current_device()).contiguous())
get_accelerator().current_device_name()).contiguous())
else:
dst.data.copy_(src_split[self.gpu_index].to(
torch.cuda.current_device()).contiguous())
get_accelerator().current_device_name()).contiguous())
else:
if src_shape[0] == dst_shape[0]:
return torch.nn.parameter.Parameter(src)
@ -78,10 +79,10 @@ class ReplaceWithTensorSlicing:
axis=0) for i in range(len(qkv_split[0]))
]
dst.data.copy_(bias_split[self.gpu_index].to(
torch.cuda.current_device()).contiguous())
get_accelerator().current_device_name()).contiguous())
else:
dst.data.copy_(src_split[self.gpu_index].to(
torch.cuda.current_device()).contiguous())
get_accelerator().current_device_name()).contiguous())
return torch.nn.parameter.Parameter(dst)
@ -101,22 +102,23 @@ class ReplaceWithTensorSlicing:
src,
dst_shape[self.in_dim],
dim=self.in_dim)[self.gpu_index].to(
torch.cuda.current_device()).contiguous()
get_accelerator().current_device_name()).contiguous()
else:
self.merge_assert(src_shape[self.out_dim], dst_shape[self.out_dim])
weight_split = torch.split(
src.data,
dst_shape[self.out_dim],
dim=self.out_dim)[self.gpu_index].to(
torch.cuda.current_device()).contiguous()
get_accelerator().current_device_name()).contiguous()
dst.data.copy_(weight_split.contiguous())
else:
if src_shape[0] == dst_shape[0]:
dst.data.copy_(src)
else:
bias_split = torch.split(src.data,
dst_shape[-1])[self.gpu_index].to(
torch.cuda.current_device()).contiguous()
bias_split = torch.split(
src.data,
dst_shape[-1])[self.gpu_index].to(
get_accelerator().current_device_name()).contiguous()
dst.data.copy_(bias_split)
dst = torch.nn.parameter.Parameter(dst, requires_grad=False)
if hasattr(src, 'scale'):
@ -154,7 +156,7 @@ class GroupQuantizer:
return inputs
q_range = 2**self.num_bits
num_groups = inputs.shape[0] // self.group_size
inputs = inputs.to(torch.cuda.current_device())
inputs = inputs.to(get_accelerator().current_device_name())
input_flat = inputs.reshape(num_groups, -1).contiguous()
input_min = torch.min(input_flat, dim=1, keepdim=True)[0].float()
input_max = torch.max(input_flat, dim=1, keepdim=True)[0].float()
@ -223,7 +225,7 @@ def generic_injection(module, fp16=False, enable_cuda_graph=True):
data = data.contiguous()
data.reshape(-1).copy_(data.transpose(-1, -2).contiguous().reshape(-1))
data = data.reshape(data.shape[-1], data.shape[-2])
data.to(torch.cuda.current_device())
data.to(get_accelerator().current_device_name())
return data
if len(policy_attn) == 5:
@ -236,7 +238,8 @@ def generic_injection(module, fp16=False, enable_cuda_graph=True):
attn_module.attn_qkvb = None
attn_module.attn_ow.data = transpose(attn_ow.data)
attn_module.attn_ob.data.copy_(attn_ob.data.to(torch.cuda.current_device()))
attn_module.attn_ob.data.copy_(
attn_ob.data.to(get_accelerator().current_device_name()))
return attn_module
def replace_attn_block(child, policy):
@ -421,7 +424,7 @@ def replace_transformer_layer(orig_layer_impl,
if child.bias is not None:
new_bias.data.copy_(child.bias.data)
return LinearAllreduce(data, child.bias if child.bias is None else \
torch.nn.parameter.Parameter(new_bias.to(torch.cuda.current_device())), mp_group)
torch.nn.parameter.Parameter(new_bias.to(get_accelerator().current_device_name())), mp_group)
else:
new_weight = torch.empty((
(weight_shape[1] if conv_linear_layer else weight_shape[0]) //
@ -439,8 +442,9 @@ def replace_transformer_layer(orig_layer_impl,
dtype=child.weight.dtype)
bias_data = None if child.bias is None else mp_replace.copy(
new_bias,
child.bias.data).to(torch.cuda.current_device())
return LinearLayer(weight=data.to(torch.cuda.current_device()),
child.bias.data).to(get_accelerator().current_device_name())
return LinearLayer(weight=data.to(
get_accelerator().current_device_name()),
bias=bias_data)
def _slice_embedding(child, name, conv_linear_layer):

Просмотреть файл

@ -3,8 +3,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
'''
import torch
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import CPUAdagradBuilder
from deepspeed.ops.op_builder import CPUAdagradBuilder
from deepspeed.utils.logging import should_log_le
@ -25,8 +24,7 @@ class DeepSpeedCPUAdagrad(torch.optim.Optimizer):
self.opt_id = DeepSpeedCPUAdagrad.optimizer_id
DeepSpeedCPUAdagrad.optimizer_id = DeepSpeedCPUAdagrad.optimizer_id + 1
self.fp32_optimizer_states = fp32_optimizer_states
self.ds_opt_adagrad = get_accelerator().create_op_builder(
CPUAdagradBuilder).load()
self.ds_opt_adagrad = CPUAdagradBuilder().load()
self.ds_opt_adagrad.create_adagrad(self.opt_id,
lr,

Просмотреть файл

@ -6,8 +6,7 @@ import torch
from cpuinfo import get_cpu_info
from deepspeed.utils import logger
from deepspeed.utils.logging import should_log_le
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import CPUAdamBuilder
from deepspeed.ops.op_builder import CPUAdamBuilder
class DeepSpeedCPUAdam(torch.optim.Optimizer):
@ -92,7 +91,7 @@ class DeepSpeedCPUAdam(torch.optim.Optimizer):
DeepSpeedCPUAdam.optimizer_id = DeepSpeedCPUAdam.optimizer_id + 1
self.adam_w_mode = adamw_mode
self.fp32_optimizer_states = fp32_optimizer_states
self.ds_opt_adam = get_accelerator().create_op_builder(CPUAdamBuilder).load()
self.ds_opt_adam = CPUAdamBuilder().load()
self.ds_opt_adam.create_adam(self.opt_id,
lr,

Просмотреть файл

@ -10,7 +10,7 @@ from .multi_tensor_apply import MultiTensorApply
multi_tensor_applier = MultiTensorApply(2048 * 32)
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import FusedAdamBuilder
from deepspeed.ops.op_builder import FusedAdamBuilder
class FusedAdam(torch.optim.Optimizer):
@ -70,7 +70,7 @@ class FusedAdam(torch.optim.Optimizer):
self.adam_w_mode = 1 if adam_w_mode else 0
self.set_grad_none = set_grad_none
fused_adam_cuda = get_accelerator().create_op_builder(FusedAdamBuilder).load()
fused_adam_cuda = FusedAdamBuilder().load()
# Skip buffer
self._dummy_overflow_buf = get_accelerator().IntTensor([0])
self.multi_tensor_adam = fused_adam_cuda.multi_tensor_adam

Просмотреть файл

@ -6,8 +6,7 @@ This file is adapted from NVIDIA/apex/optimizer/fused_adam and implements the LA
'''
import types
import torch
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import FusedLambBuilder
from deepspeed.ops.op_builder import FusedLambBuilder
class FusedLamb(torch.optim.Optimizer):
@ -49,8 +48,7 @@ class FusedLamb(torch.optim.Optimizer):
max_coeff=10.0,
min_coeff=0.01,
amsgrad=False):
self.fused_lamb_cuda = get_accelerator().create_op_builder(
FusedLambBuilder).load()
self.fused_lamb_cuda = FusedLambBuilder().load()
if amsgrad:
raise RuntimeError('FusedLamb does not support the AMSGrad variant.')

Просмотреть файл

@ -3,8 +3,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
'''
import torch
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import QuantizerBuilder
from deepspeed.ops.op_builder import QuantizerBuilder
# Cuda modules will be imported if needed
quantizer_cuda_module = None
@ -14,8 +13,7 @@ def ds_quantizer(input, groups=1, bit_num=8, sr=False, asym=False):
# Load cuda modules if needed
global quantizer_cuda_module
if quantizer_cuda_module is None:
quantizer_cuda_module = get_accelerator().create_op_builder(
QuantizerBuilder).load()
quantizer_cuda_module = QuantizerBuilder().load()
if sr:
if asym:
quantize_func = quantizer_cuda_module.ds_sr_quantize_asym_fp16 if input.dtype == torch.half else quantizer_cuda_module.ds_sr_quantize_asym_fp32

Просмотреть файл

@ -3,8 +3,7 @@ Copyright 2022 The Microsoft DeepSpeed Team
"""
import torch
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import RandomLTDBuilder
from deepspeed.ops.op_builder import RandomLTDBuilder
"""
Returns:
sampled_indices: [layers, batch_size, reserved_length]
@ -29,7 +28,7 @@ def gpt_sample_tokens(reserved_length: int,
reserved_length).to(torch.int32)
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
sampled_indices = random_ltd_module.token_sort_(sampled_indices, seq_length)
# Not certain the optimized kernel is actually better here, cause it kind of screws
@ -65,7 +64,7 @@ def bert_sample_tokens(reserved_length: int,
reserved_length).to(torch.int32)
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
sampled_indices = random_ltd_module.token_sort_(sampled_indices, seq_length)
dtype = sampled_indices.dtype
@ -90,8 +89,7 @@ class GatherTokens(torch.autograd.Function):
batch_first: bool):
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(
RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
ctx.save_for_backward(activations, sorted_indices)
ctx.batch_first = batch_first
return activations, random_ltd_module.token_gather(activations, sorted_indices, batch_first)
@ -102,8 +100,7 @@ class GatherTokens(torch.autograd.Function):
g_gradients = g_gradients.contiguous()
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(
RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
activations, sorted_indices = ctx.saved_tensors
batch_first = ctx.batch_first
@ -122,8 +119,7 @@ class ScatterTokens(torch.autograd.Function):
batch_first: bool):
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(
RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
scatter_results = random_ltd_module.token_scatter_(all_activations.clone(),
layer_activations,
sorted_indices,
@ -139,8 +135,7 @@ class ScatterTokens(torch.autograd.Function):
out_gradients = out_gradients.contiguous()
global random_ltd_module
if random_ltd_module is None:
random_ltd_module = get_accelerator().create_op_builder(
RandomLTDBuilder).load()
random_ltd_module = RandomLTDBuilder().load()
sorted_indices, = ctx.saved_tensors
batch_first = ctx.batch_first

Просмотреть файл

@ -4,8 +4,7 @@ Copyright 2022 The Microsoft DeepSpeed Team
from typing import Optional
import torch
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import SpatialInferenceBuilder
from deepspeed.ops.op_builder import SpatialInferenceBuilder
spatial_cuda_module = None
@ -16,8 +15,7 @@ def nhwc_bias_add(activation: torch.Tensor,
other_bias: Optional[torch.Tensor] = None) -> torch.Tensor:
global spatial_cuda_module
if spatial_cuda_module is None:
spatial_cuda_module = get_accelerator().create_op_builder(
SpatialInferenceBuilder).load()
spatial_cuda_module = SpatialInferenceBuilder().load()
if other is None:
return spatial_cuda_module.nhwc_bias_add(activation, bias)

Просмотреть файл

@ -8,7 +8,7 @@ import torch.nn as nn
from packaging import version as pkg_version
from deepspeed.utils.logging import log_dist
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import InferenceBuilder
from deepspeed.ops.op_builder import InferenceBuilder
# Cuda modules will be imported if needed
inference_cuda_module = None
@ -150,7 +150,7 @@ class DeepSpeedDiffusersAttention(nn.Module):
data_type_fp = torch.half if config.fp16 else torch.float
global inference_cuda_module
if inference_cuda_module is None:
builder = get_accelerator().create_op_builder(InferenceBuilder)
builder = InferenceBuilder()
inference_cuda_module = builder.load()
if DeepSpeedDiffusersAttention.layer_id == 1:

Просмотреть файл

@ -9,8 +9,7 @@ from deepspeed import module_inject
from .diffusers_attention import DeepSpeedDiffusersAttention
from .bias_add import nhwc_bias_add
from .diffusers_2d_transformer import Diffusers2DTransformerConfig
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import InferenceBuilder, SpatialInferenceBuilder
from deepspeed.ops.op_builder import InferenceBuilder, SpatialInferenceBuilder
# Ops will be loaded on demand
transformer_cuda_module = None
@ -20,16 +19,14 @@ spatial_cuda_module = None
def load_transformer_module():
global transformer_cuda_module
if transformer_cuda_module is None:
transformer_cuda_module = get_accelerator().create_op_builder(
InferenceBuilder).load()
transformer_cuda_module = InferenceBuilder().load()
return transformer_cuda_module
def load_spatial_module():
global spatial_cuda_module
if spatial_cuda_module is None:
spatial_cuda_module = get_accelerator().create_op_builder(
SpatialInferenceBuilder).load()
spatial_cuda_module = SpatialInferenceBuilder().load()
return spatial_cuda_module

Просмотреть файл

@ -15,7 +15,7 @@ from .config import DeepSpeedInferenceConfig
from ....moe.sharded_moe import TopKGate
from deepspeed import comm as dist
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import InferenceBuilder
from deepspeed.ops.op_builder import InferenceBuilder
class DeepSpeedMoEInferenceConfig(DeepSpeedInferenceConfig):
@ -248,8 +248,7 @@ class DeepSpeedMoEInference(nn.Module):
inference_cuda_module = builder.load()
specialized_mode = True
else:
inference_cuda_module = get_accelerator().create_op_builder(
InferenceBuilder).load()
inference_cuda_module = InferenceBuilder().load()
self.config.specialized_mode = specialized_mode
DeepSpeedMoEInference.layer_id += 1

Просмотреть файл

@ -1,7 +1,8 @@
import torch
import deepspeed
from ..config import DeepSpeedInferenceConfig
from deepspeed.ops.op_builder import InferenceBuilder
class BaseOp(torch.nn.Module):
inference_cuda_module = None
@ -10,5 +11,5 @@ class BaseOp(torch.nn.Module):
super(BaseOp, self).__init__()
self.config = config
if BaseOp.inference_cuda_module is None:
builder = deepspeed.ops.op_builder.InferenceBuilder()
builder = InferenceBuilder()
BaseOp.inference_cuda_module = builder.load()

Просмотреть файл

@ -7,7 +7,7 @@ import torch
from torch import nn
from torch.autograd import Function
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import TransformerBuilder, StochasticTransformerBuilder
from deepspeed.ops.op_builder import TransformerBuilder, StochasticTransformerBuilder
# Cuda modules will be imported if needed
transformer_cuda_module = None
@ -531,11 +531,9 @@ class DeepSpeedTransformerLayer(nn.Module):
# Load cuda modules if needed
global transformer_cuda_module, stochastic_transformer_cuda_module
if transformer_cuda_module is None and not self.config.stochastic_mode:
transformer_cuda_module = get_accelerator().create_op_builder(
TransformerBuilder).load()
transformer_cuda_module = TransformerBuilder().load()
if stochastic_transformer_cuda_module is None and self.config.stochastic_mode:
stochastic_transformer_cuda_module = get_accelerator().create_op_builder(
StochasticTransformerBuilder).load()
stochastic_transformer_cuda_module = StochasticTransformerBuilder().load()
# create the layer in cuda kernels.
cuda_module = stochastic_transformer_cuda_module if self.config.stochastic_mode else transformer_cuda_module

Просмотреть файл

@ -6,6 +6,7 @@ from functools import partial
from typing import List, Optional
from collections import OrderedDict
import numpy as np
from deepspeed.accelerator import get_accelerator
Tensor = torch.Tensor
@ -98,7 +99,7 @@ class FlopsProfiler(object):
module.__post_hook_handle__ = module.register_forward_hook(post_hook)
def start_time_hook(module, input):
torch.cuda.synchronize()
get_accelerator().synchronize()
module.__start_time__ = time.time()
if not hasattr(module, "__start_time_hook_handle"):
@ -106,7 +107,7 @@ class FlopsProfiler(object):
start_time_hook)
def end_time_hook(module, input, output):
torch.cuda.synchronize()
get_accelerator().synchronize()
module.__duration__ += time.time() - module.__start_time__
if not hasattr(module, "__end_time_hook_handle__"):

Просмотреть файл

@ -20,12 +20,12 @@ from deepspeed import comm as dist
import mmap
from torch import _C
from torch.cuda import _lazy_call, device as device_ctx_manager
from deepspeed.runtime.config import DeepSpeedConfig
from deepspeed.utils import logger
from deepspeed.runtime.utils import copy_to_device, move_to_device, see_memory_usage, bwc_tensor_model_parallel_rank
from deepspeed.utils.timer import SynchronizedWallClockTimer as Timers
from deepspeed.accelerator import get_accelerator
# DeepSpeed Checkpointing Enabled or Disabled
deepspeed_checkpointing_enabled = False
@ -98,25 +98,25 @@ def _set_cuda_rng_state(new_state, device=-1):
if hasattr(_C, '_cuda_setRNGState') and callable(_C._cuda_setRNGState):
# older PyTorch
def cb():
with device_ctx_manager(device):
with get_accelerator().device(device):
_C._cuda_setRNGState(new_state)
else:
# newer PyTorch
if device == -1:
device = torch.device('cuda')
device = torch.device(get_accelerator().device_name())
elif isinstance(device, str):
device = torch.device(device)
elif isinstance(device, int):
device = torch.device('cuda', device)
device = torch.device(get_accelerator().device_name(), device)
def cb():
idx = device.index
if idx is None:
idx = torch.cuda.current_device()
default_generator = torch.cuda.default_generators[idx]
idx = get_accelerator().current_device()
default_generator = get_accelerator().default_generator(idx)
default_generator.set_state(new_state)
_lazy_call(cb)
get_accelerator().lazy_call(cb)
class CudaRNGStatesTracker:
@ -158,10 +158,10 @@ class CudaRNGStatesTracker:
if name in self.states_:
raise Exception('cuda rng state {} already exists'.format(name))
# Get the current rng state.
orig_rng_state = torch.cuda.get_rng_state()
orig_rng_state = get_accelerator().get_rng_state()
# Set the new state and store it.
torch.cuda.manual_seed(seed)
self.states_[name] = torch.cuda.get_rng_state()
get_accelerator().manual_seed(seed)
self.states_[name] = get_accelerator().get_rng_state()
# Reset rng state to what it was.
_set_cuda_rng_state(orig_rng_state)
@ -173,7 +173,7 @@ class CudaRNGStatesTracker:
if name not in self.states_:
raise Exception('cuda rng state {} is not added'.format(name))
# Store current rng state.
orig_cuda_rng_state = torch.cuda.get_rng_state()
orig_cuda_rng_state = get_accelerator().get_rng_state()
# Set rng state to the desired one
_set_cuda_rng_state(self.states_[name])
# Do the stuff we wanted to do.
@ -181,7 +181,7 @@ class CudaRNGStatesTracker:
yield
finally:
# Update the current rng state for later use.
self.states_[name] = torch.cuda.get_rng_state()
self.states_[name] = get_accelerator().get_rng_state()
# And set the state to the original state we started with.
_set_cuda_rng_state(orig_cuda_rng_state)
@ -199,7 +199,7 @@ def model_parallel_cuda_manual_seed(seed):
"""Initialize model parallel cuda seed.
This function should be called after the model parallel is
initialized. Also, no torch.cuda.manual_seed should be called
initialized. Also, no get_accelerator().manual_seed should be called
after this function. Basically, this is replacement for that
function.
Two set of RNG states are tracked:
@ -235,7 +235,7 @@ def model_parallel_cuda_manual_seed(seed):
)
_CUDA_RNG_STATE_TRACKER.reset()
# Set the default state.
torch.cuda.manual_seed(data_parallel_seed)
get_accelerator().manual_seed(data_parallel_seed)
# and model parallel state.
_CUDA_RNG_STATE_TRACKER.add(_MODEL_PARALLEL_RNG_TRACKER_NAME, model_parallel_seed)
@ -516,7 +516,7 @@ class CheckpointFunction(torch.autograd.Function):
ctx.tensor_flags = tensor_flags
if SYNCHRONIZE:
torch.cuda.synchronize()
get_accelerator().synchronize()
if timers is None and PROFILE_TIME:
timers = Timers()
@ -559,8 +559,8 @@ class CheckpointFunction(torch.autograd.Function):
logger.info(f"----Synchronization {SYNCHRONIZE}")
logger.info(f"----Profiling time in checkpointing {PROFILE_TIME}")
cuda_device = torch.cuda.current_device()
transport_stream = torch.cuda.Stream(device=cuda_device)
cuda_device = get_accelerator().current_device_name()
transport_stream = get_accelerator().Stream(device=cuda_device)
if PARTITION_ACTIVATIONS:
inputs = partition_activations(args,
@ -578,7 +578,7 @@ class CheckpointFunction(torch.autograd.Function):
# Copy the rng states.
ctx.fwd_cpu_rng_state = torch.get_rng_state()
ctx.fwd_cuda_rng_state = torch.cuda.get_rng_state()
ctx.fwd_cuda_rng_state = get_accelerator().get_rng_state()
ctx.fwd_cuda_rng_state_tracker = get_cuda_rng_tracker().get_states()
see_memory_usage("Before running forward on the layer", force=False)
@ -606,7 +606,7 @@ class CheckpointFunction(torch.autograd.Function):
timers('forward').stop()
timers.log(['forward'])
if SYNCHRONIZE:
torch.cuda.synchronize()
get_accelerator().synchronize()
# Tensors returned from forward() may not be differentiable.
if torch.is_tensor(outputs):
@ -633,7 +633,7 @@ class CheckpointFunction(torch.autograd.Function):
# so that they can be garbage collected once the checkpoints
# have been used
if SYNCHRONIZE:
torch.cuda.synchronize()
get_accelerator().synchronize()
if PROFILE_TIME:
timers('backward').start()
@ -659,7 +659,7 @@ class CheckpointFunction(torch.autograd.Function):
global cuda_device, transport_stream, PARTITION_ACTIVATIONS
if PARTITION_ACTIVATIONS:
# with torch.cuda.stream(transport_stream):
# with get_accelerator().stream(transport_stream):
inputs = gather_partitioned_activations(
ctx.deepspeed_saved_tensors,
device=cuda_device if CPU_CHECKPOINT else None)
@ -680,7 +680,7 @@ class CheckpointFunction(torch.autograd.Function):
# Store the current states.
bwd_cpu_rng_state = torch.get_rng_state()
bwd_cuda_rng_state = torch.cuda.get_rng_state()
bwd_cuda_rng_state = get_accelerator().get_rng_state()
bwd_cuda_rng_state_tracker = get_cuda_rng_tracker().get_states()
# Set the states to what it used to be before the forward pass.
@ -689,7 +689,7 @@ class CheckpointFunction(torch.autograd.Function):
get_cuda_rng_tracker().set_states(ctx.fwd_cuda_rng_state_tracker)
# if PARTITION_ACTIVATIONS:
# current_stream=torch.cuda.current_stream()
# current_stream=get_accelerator().current_stream()
# current_stream.wait_stream(transport_stream)
see_memory_usage("In backward checkpointing code before forward", force=False)
@ -734,7 +734,7 @@ class CheckpointFunction(torch.autograd.Function):
timers('backward').stop()
timers.log(['backward'])
if SYNCHRONIZE:
torch.cuda.synchronize()
get_accelerator().synchronize()
ret_list = [None, None] # first None for ctx
for inp in detached_inputs:
if torch.is_tensor(inp):
@ -861,7 +861,7 @@ def configure(
checkpoint_in_cpu: Optional: Moves the activation checkpoint to CPU. Only works with
partition_activation. Default is false. Will overwrite deepspeed_config if provided
synchronize: Optional: Performs torch.cuda.synchronize() at the beginning and end of
synchronize: Optional: Performs get_accelerator().synchronize() at the beginning and end of
each call to deepspeed.checkpointing.checkpoint for both forward and backward pass.
By default false. Will overwrite deepspeed_config if provided

Просмотреть файл

@ -8,6 +8,7 @@ import cupy
import numpy as np
from deepspeed.runtime.compression.cupy import CupyBackend
from deepspeed.accelerator import get_accelerator
class NcclBackend(object):
@ -100,7 +101,8 @@ class NcclBackend(object):
recvbuf_scale = [
torch.zeros(1,
dtype=worker_scale.dtype,
device=torch.device(local_rank)) for i in range(self.size)
device=torch.device(get_accelerator().device_name(local_rank)))
for i in range(self.size)
]
# communication phase 1

Просмотреть файл

@ -23,6 +23,7 @@ import numpy as np
import deepspeed.comm as dist
from deepspeed.utils import logger
from deepspeed.accelerator import get_accelerator
from ..constants import *
from ..curriculum_scheduler import CurriculumScheduler
from .indexed_dataset import MMapIndexedDataset
@ -326,11 +327,11 @@ class DeepSpeedDataSampler(object):
samples_per_cluster[cidx])
self.np_rng.shuffle(batch)
batch = torch.tensor(batch,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
dtype=torch.long).view(-1)
else:
batch = torch.empty(self.global_batch_size,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
dtype=torch.long)
dist.broadcast(batch, 0, group=self.data_parallel_group)
self.batch = batch.tolist()

Просмотреть файл

@ -2,9 +2,9 @@
Copyright 2019 The Microsoft DeepSpeed Team
'''
import torch
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from deepspeed.accelerator import get_accelerator
from deepspeed.runtime.data_pipeline.data_sampling.data_sampler import DeepSpeedDataSampler
from deepspeed.runtime.data_pipeline.constants import CURRICULUM_LEARNING, \
@ -69,7 +69,7 @@ class DeepSpeedDataLoader(object):
self.deepspeed_dataloader_config[GRADIENT_ACCUMULATION_STEPS],
self.deepspeed_dataloader_config[GLOBAL_RANK],
drop_last=dataloader_drop_last)
device_count = torch.cuda.device_count()
device_count = get_accelerator().device_count()
num_local_io_workers = self.deepspeed_dataloader_config[
DATA_SAMPLING_NUM_WORKERS]
else:
@ -83,7 +83,7 @@ class DeepSpeedDataLoader(object):
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
device_count = get_accelerator().device_count()
batch_size *= device_count
if num_local_io_workers is None:

Просмотреть файл

@ -80,7 +80,6 @@ from deepspeed.runtime.checkpoint_engine.torch_checkpoint_engine import TorchChe
from .pipe.module import PipelineModule
from .utils import ensure_directory_exists, get_ma_status
from ..ops.op_builder import UtilsBuilder
from ..ops.adam import FusedAdam
from ..moe.sharded_moe import TopKGate, MOELayer
from ..moe.layer import MoE
@ -90,6 +89,9 @@ from ..git_version_info import version
from deepspeed.profiling.flops_profiler.profiler import FlopsProfiler
from deepspeed.utils.logging import print_json_dist, print_configuration
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import UtilsBuilder
from deepspeed.inference.config import DtypeEnum
# Set to torch's distributed package or deepspeed.comm based inside DeepSpeedEngine init
@ -112,11 +114,12 @@ except ImportError:
def split_half_float_double_sparse(tensors):
device_type = get_accelerator().device_name()
supported_types = [
"torch.cuda.HalfTensor",
"torch.cuda.FloatTensor",
"torch.cuda.DoubleTensor",
"torch.cuda.BFloat16Tensor",
"torch.{}.HalfTensor".format(device_type),
"torch.{}.FloatTensor".format(device_type),
"torch.{}.DoubleTensor".format(device_type),
"torch.{}.BFloat16Tensor".format(device_type),
SparseTensor.type()
]
@ -222,7 +225,7 @@ class DeepSpeedEngine(Module):
self.eigenvalue = None
self.block_eigenvalue = None
self.gas_boundary_ctr = 0
self.dist_backend = "nccl"
self.dist_backend = get_accelerator().communication_backend_name()
self.has_moe_layers = False
self.num_experts = []
self.gate_modules = []
@ -966,14 +969,14 @@ class DeepSpeedEngine(Module):
args,
'device_rank') else self.local_rank
if device_rank >= 0:
torch.cuda.set_device(device_rank)
self.device = torch.device("cuda", device_rank)
get_accelerator().set_device(device_rank)
self.device = torch.device(get_accelerator().device_name(), device_rank)
self.world_size = dist.get_world_size()
self.global_rank = dist.get_rank()
else:
self.world_size = 1
self.global_rank = 0
self.device = torch.device("cuda")
self.device = torch.device(get_accelerator().device_name())
# Configure based on command line arguments
def _configure_with_arguments(self, args, mpu):

Просмотреть файл

@ -14,6 +14,7 @@ from deepspeed.runtime.fp16.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW,
from deepspeed.utils import groups, logger, log_dist
from deepspeed import comm as dist
from deepspeed.checkpoint.constants import OPTIMIZER_STATE_DICT, CLIP_GRAD
from deepspeed.accelerator import get_accelerator
class FP16_Optimizer(DeepSpeedOptimizer):
@ -41,8 +42,8 @@ class FP16_Optimizer(DeepSpeedOptimizer):
self.deepspeed = deepspeed
self.has_moe_layers = has_moe_layers
self.using_pipeline = self.deepspeed.pipeline_parallelism
if not torch.cuda.is_available():
raise SystemError("Cannot use fp16 without CUDA.")
if not get_accelerator().is_available():
raise SystemError("Cannot use fp16 without accelerator.")
self.optimizer = init_optimizer
# param flattened by groups
@ -457,7 +458,7 @@ class FP16_Optimizer(DeepSpeedOptimizer):
will call ``model.load_state_dict()`` before
``fp16_optimizer_instance.load_state_dict()`` is called.
Example::
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = torch.nn.Linear(D_in, D_out).to(get_accelerator().device_name()).half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
optimizer = FP16_Optimizer(optimizer, static_loss_scale = 128.0)
...

Просмотреть файл

@ -4,6 +4,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
import types
import torch
import numpy as np
from deepspeed.accelerator import get_accelerator
from deepspeed import comm as dist
@ -174,12 +175,12 @@ class OnebitAdam(torch.optim.Optimizer):
(self.size * self.divider)))
state['server_chunk_size'] = state[
'corrected_tensor_size'] // self.size
torch.cuda.empty_cache()
get_accelerator().empty_cache()
state['worker_error'] = torch.zeros(state['corrected_tensor_size'],
device=p.device)
state['server_error'] = torch.zeros(state['server_chunk_size'],
device=p.device)
torch.cuda.empty_cache()
get_accelerator().empty_cache()
self.adam_freeze_key = True
if not self.initialize and dist.get_rank() == 0:
print("Cupy Buffers Initialized Successfully.")

Просмотреть файл

@ -6,6 +6,7 @@ import torch
import numpy as np
from deepspeed import comm as dist
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from deepspeed.accelerator import get_accelerator
class OnebitLamb(torch.optim.Optimizer):
@ -283,7 +284,7 @@ class OnebitLamb(torch.optim.Optimizer):
p.data = q.data
if self.initialize and len(self.worker_errors) == 0:
torch.cuda.empty_cache()
get_accelerator().empty_cache()
for i in range(len(self.exp_avg_flat)):
self.worker_errors.append(
torch.zeros(self.corrected_tensor_sizes[i],
@ -291,20 +292,20 @@ class OnebitLamb(torch.optim.Optimizer):
self.server_errors.append(
torch.zeros(self.server_chunk_sizes[i],
device=self.exp_avg_flat[i].device))
torch.cuda.empty_cache()
get_accelerator().empty_cache()
if self.lamb_freeze_key:
if self.size > 1:
for i in range(len(self.exp_avg_flat)):
if not self.initialize:
torch.cuda.empty_cache()
get_accelerator().empty_cache()
self.worker_errors.append(
torch.zeros(self.corrected_tensor_sizes[i],
device=self.exp_avg_flat[i].device))
self.server_errors.append(
torch.zeros(self.server_chunk_sizes[i],
device=self.exp_avg_flat[i].device))
torch.cuda.empty_cache()
get_accelerator().empty_cache()
if dist.get_rank() == 0:
print("Cupy Buffers Initialized Successfully.")

Просмотреть файл

@ -4,6 +4,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
import types
import torch
import numpy as np
from deepspeed.accelerator import get_accelerator
from deepspeed import comm as dist
@ -185,14 +186,14 @@ class ZeroOneAdam(torch.optim.Optimizer):
(self.size * self.divider)))
state['server_chunk_size'] = state[
'corrected_tensor_size'] // self.size
torch.cuda.empty_cache()
get_accelerator().empty_cache()
state['worker_error'] = torch.zeros(state['corrected_tensor_size'],
device=p.device)
state['server_error'] = torch.zeros(state['server_chunk_size'],
device=p.device)
# Accumulation of momentum, i.e., the u variable in the 0/1 Adam paper
state['momentum_accumulator'] = torch.zeros_like(p.data)
torch.cuda.empty_cache()
get_accelerator().empty_cache()
# self.freeze_key = True
if not self.initialize and dist.get_rank() == 0:
print("Cupy Buffers Initialized Successfully.")

Просмотреть файл

@ -14,6 +14,7 @@ from deepspeed.runtime.utils import get_global_norm, CheckOverflow, get_weight_n
from deepspeed.runtime.fp16.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, MIN_LOSS_SCALE
from deepspeed.utils import logger
from deepspeed.checkpoint.constants import OPTIMIZER_STATE_DICT
from deepspeed.accelerator import get_accelerator
from deepspeed import comm as dist
@ -40,8 +41,8 @@ class FP16_UnfusedOptimizer(DeepSpeedOptimizer):
if dist.get_rank() == 0:
logger.info(f'Fused Lamb Legacy : {self.fused_lamb_legacy} ')
if not torch.cuda.is_available():
raise SystemError("Cannot use fp16 without CUDA.")
if not get_accelerator().is_available():
raise SystemError("Cannot use fp16 without accelerator.")
self.optimizer = init_optimizer
# param groups
@ -387,7 +388,7 @@ class FP16_UnfusedOptimizer(DeepSpeedOptimizer):
will call ``model.load_state_dict()`` before
``fp16_optimizer_instance.load_state_dict()`` is called.
Example::
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = torch.nn.Linear(D_in, D_out).to(get_accelerator().device_name()).half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
optimizer = FP16_Optimizer(optimizer, static_loss_scale = 128.0)
...
@ -432,13 +433,13 @@ class FP16_UnfusedOptimizer(DeepSpeedOptimizer):
for param in group:
param.grad = torch.zeros(param.size(),
dtype=param.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
for i, group in enumerate(self.fp32_groups):
for param in group:
param.grad = torch.zeros(param.size(),
dtype=param.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
self.optimizer.step()

Просмотреть файл

@ -7,6 +7,7 @@ from deepspeed import comm as dist
from deepspeed.utils import logger
from deepspeed.utils.timer import ThroughputTimer
from deepspeed.accelerator import get_accelerator
from ..engine import DeepSpeedEngine, MEMORY_OPT_ALLREDUCE_SIZE
from ..utils import PartitionedTensor
@ -1270,14 +1271,14 @@ class PipelineEngine(DeepSpeedEngine):
if print_rank != -1 and rank != print_rank:
return
torch.cuda.synchronize()
get_accelerator().synchronize()
if reset_max:
torch.cuda.reset_max_memory_cached()
torch.cuda.reset_max_memory_allocated()
get_accelerator().reset_max_memory_cached()
get_accelerator().reset_max_memory_allocated()
new_alloced = torch.cuda.memory_allocated()
new_cached = torch.cuda.memory_cached()
new_alloced = get_accelerator().memory_allocated()
new_cached = get_accelerator().memory_cached()
delta_alloced = new_alloced - mem_alloced
delta_cached = new_cached - mem_cached
@ -1285,8 +1286,8 @@ class PipelineEngine(DeepSpeedEngine):
mem_cached = new_cached
mem_alloced = new_alloced
max_alloced = torch.cuda.max_memory_allocated()
max_cached = torch.cuda.max_memory_cached()
max_alloced = get_accelerator().max_memory_allocated()
max_cached = get_accelerator().max_memory_cached()
# convert to GB for printing
new_alloced /= 1024**3

Просмотреть файл

@ -14,6 +14,7 @@ from .. import utils as ds_utils
from ..activation_checkpointing import checkpointing
from .topology import PipeDataParallelTopology, PipelineParallelGrid
from deepspeed.runtime.state_dict_factory import SDLoaderFactory
from deepspeed.accelerator import get_accelerator
class PipelineError(Exception):
@ -195,12 +196,12 @@ class PipelineModule(nn.Module):
self.tied_weight_attrs = {}
# Offset the random seed by the stage ID.
#newseed = torch.cuda.initial_seed() + self._grid.get_stage_id()
#newseed = get_accelerator().initial_seed() + self._grid.get_stage_id()
#ds_utils.set_random_seed(newseed)
#with torch.random.fork_rng(devices=[torch.cuda.current_device()]):
#with torch.random.fork_rng(devices=[get_accelerator().current_device_name()]):
self._build()
self.to(f'cuda:{self.local_rank}')
self.to(get_accelerator().device_name(self.local_rank))
self.tied_comms = self._index_tied_modules()
self._synchronize_tied_weights()

Просмотреть файл

@ -11,6 +11,7 @@ from deepspeed import comm as dist
# To query whether we have send/recv support
from packaging.version import Version
from deepspeed.git_version_info import torch_info
from deepspeed.accelerator import get_accelerator
_groups = None
_grid = None
@ -92,7 +93,7 @@ def wait():
op.wait()
_async = []
torch.cuda.synchronize()
get_accelerator().synchronize()
def send_obj(msg: typing.Any, dest: int):
@ -110,10 +111,12 @@ def send_obj(msg: typing.Any, dest: int):
# serialize the message
msg = pickle.dumps(msg)
# construct a tensor to send
msg = torch.ByteTensor(torch.ByteStorage.from_buffer(msg)).cuda()
msg = torch.ByteTensor(torch.ByteStorage.from_buffer(msg)).to(
get_accelerator().device_name())
# Send meta and message
length_tensor = torch.tensor([len(msg)], dtype=torch.long).cuda()
length_tensor = torch.tensor([len(msg)],
dtype=torch.long).to(get_accelerator().device_name())
dist.send(length_tensor, dst=dest)
dist.send(msg, dst=dest)
@ -128,11 +131,12 @@ def recv_obj(sender: int) -> typing.Any:
sender (int): The rank sending the message.
"""
# Get message meta
length = torch.tensor([0], dtype=torch.long).cuda()
length = torch.tensor([0], dtype=torch.long).to(get_accelerator().device_name())
dist.recv(length, src=sender)
# Receive and deserialize
msg = torch.empty(length.item(), dtype=torch.uint8).cuda()
msg = torch.empty(length.item(),
dtype=torch.uint8).to(get_accelerator().device_name())
dist.recv(msg, src=sender)
msg = pickle.loads(msg.cpu().numpy().tobytes())
@ -140,7 +144,7 @@ def recv_obj(sender: int) -> typing.Any:
def _to(x):
"""Recursively move to the current device."""
if torch.is_tensor(x):
return x.cuda()
return x.to(get_accelerator().device_name())
if isinstance(x, (tuple, list)):
ret = [_to(x_) for x_ in x]
if isinstance(x, tuple):

Просмотреть файл

@ -8,7 +8,7 @@ Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
import torch
from deepspeed.utils.logging import logger
from deepspeed.ops.aio import AsyncIOBuilder
from deepspeed.ops.op_builder import AsyncIOBuilder
from deepspeed import comm as dist
from deepspeed.runtime.swap_tensor.constants import *

Просмотреть файл

@ -10,8 +10,8 @@ import shutil
from enum import Enum
import torch
from deepspeed import comm as dist
from deepspeed.ops.aio import AsyncIOBuilder
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import AsyncIOBuilder
from .constants import *
from .utils import swap_in_tensors, swap_out_tensors, MIN_AIO_BYTES, AIO_ALIGNED_BYTES, print_object, SwapBufferPool
@ -107,11 +107,10 @@ class AsyncPartitionedParameterSwapper(object):
self.available_buffer_ids = [i for i in range(self.param_buffer_count)]
self.reserved_buffer_ids = []
self.buffers = torch.empty(int(self.aligned_elements_per_buffer *
self.param_buffer_count),
dtype=self.dtype,
pin_memory=True,
requires_grad=False)
self.buffers = get_accelerator().pin_memory(
torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count),
dtype=self.dtype,
requires_grad=False))
self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE],
self.aio_config[AIO_QUEUE_DEPTH],
@ -393,9 +392,10 @@ class AsyncPartitionedParameterSwapper(object):
def reserve_partitioned_swap_space(self, partition_num_elems):
aligned_numel = sum(
[self._io_aligned_numel(numel) for numel in partition_num_elems])
self.partitioned_swap_buffer = torch.zeros(aligned_numel,
device='cpu',
dtype=self.dtype).pin_memory()
self.partitioned_swap_buffer = get_accelerator().pin_memory(
torch.zeros(aligned_numel,
device='cpu',
dtype=self.dtype))
self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer])
def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params):

Просмотреть файл

@ -5,7 +5,7 @@ Licensed under the MIT license.
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""
from deepspeed.ops.aio import AsyncIOBuilder
from deepspeed.ops.op_builder import AsyncIOBuilder
from deepspeed import comm as dist
from deepspeed.runtime.swap_tensor.constants import *

Просмотреть файл

@ -7,6 +7,7 @@ Functionality of swapping tensors to/from (NVMe) storage devices.
import torch
from deepspeed.utils.logging import logger
from deepspeed.accelerator import get_accelerator
from deepspeed import comm as dist
@ -179,9 +180,10 @@ class SwapBufferManager(object):
self.count = count
self.dtype = dtype
self.all_buffers = [
torch.zeros(num_elems,
device='cpu',
dtype=dtype).pin_memory() for _ in range(count)
get_accelerator().pin_memory(
torch.zeros(num_elems,
device='cpu',
dtype=dtype)) for _ in range(count)
]
self.free_buffer_index = [i for i in range(count)]
self.used_buffer_index = {}

Просмотреть файл

@ -22,16 +22,10 @@ from deepspeed import comm as dist
from deepspeed.utils import groups, logger
from deepspeed.runtime.constants import PIPE_REPLICATED
from numpy import prod
from deepspeed.accelerator import get_accelerator
# pt-1.9 deprecations
if hasattr(torch.cuda, "memory_reserved"):
torch_memory_reserved = torch.cuda.memory_reserved
else:
torch_memory_reserved = torch.cuda.memory_allocated
if hasattr(torch.cuda, "max_memory_reserved"):
torch_max_memory_reserved = torch.cuda.max_memory_reserved
else:
torch_max_memory_reserved = torch.cuda.memory_cached
torch_memory_reserved = get_accelerator().memory_reserved
torch_max_memory_reserved = get_accelerator().max_memory_reserved
class DummyOptim():
@ -191,7 +185,7 @@ class CheckOverflow(object):
def check_using_norm(self, norm_group, reduce_overflow=True):
# TODO: I don't think reduce_overflow is needed if mpu is None
overflow = -1 in norm_group
overflow_gpu = torch.cuda.FloatTensor([overflow])
overflow_gpu = get_accelerator().FloatTensor([overflow])
if self.has_moe_params:
# In this case, we need to do an all_reduce across
# the expert_parallel_group, so that if there was
@ -242,7 +236,7 @@ class CheckOverflow(object):
overflow = self.has_overflow_serial(params)
# Since each model parallel GPU carries only part of the model,
# make sure overflow flag is synced across all the model parallel GPUs
overflow_gpu = torch.cuda.ByteTensor([overflow])
overflow_gpu = get_accelerator().ByteTensor([overflow])
# deepspeeed.comm.all_reduce(overflow_gpu,
# op=deepspeed.comm.ReduceOp.MAX,
# group=mpu.get_model_parallel_group())
@ -352,7 +346,7 @@ def clip_grad_norm_(parameters, max_norm, norm_type=2, mpu=None):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(p.grad.data.abs().max() for p in parameters)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
# Take max across all GPUs.
if mpu is not None:
dist.all_reduce(total_norm_cuda,
@ -372,7 +366,7 @@ def clip_grad_norm_(parameters, max_norm, norm_type=2, mpu=None):
total_norm += param_norm.item()**norm_type
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
if mpu is not None:
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
@ -383,7 +377,7 @@ def clip_grad_norm_(parameters, max_norm, norm_type=2, mpu=None):
pg = groups._get_data_parallel_group()
scaled_norm = total_norm * 1.0 / float(dist.get_world_size(group=pg))
scaled_norm_tensor = torch.cuda.FloatTensor([float(scaled_norm)])
scaled_norm_tensor = get_accelerator().FloatTensor([float(scaled_norm)])
dist.all_reduce(scaled_norm_tensor, group=pg)
total_norm = scaled_norm_tensor.item()
@ -418,7 +412,7 @@ def get_grad_norm(parameters, norm_type=2, mpu=None):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(p.grad.data.abs().max() for p in parameters)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
# Take max across all GPUs.
if mpu is not None:
dist.all_reduce(total_norm_cuda,
@ -442,7 +436,7 @@ def get_grad_norm(parameters, norm_type=2, mpu=None):
total_norm += param_norm.item()**norm_type
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
if mpu is not None:
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
@ -488,7 +482,7 @@ def get_grad_zeros(parameters, mpu=None):
total_zeros += count_zeros.item()
# Sum across all model parallel GPUs.
total_zeros_cuda = torch.cuda.FloatTensor([float(total_zeros)])
total_zeros_cuda = get_accelerator().FloatTensor([float(total_zeros)])
if mpu is not None:
dist.all_reduce(total_zeros_cuda,
op=dist.ReduceOp.SUM,
@ -521,7 +515,7 @@ def get_weight_norm(parameters, norm_type=2, mpu=None):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(p.data.abs().max() for p in parameters)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
# Take max across all GPUs.
if mpu is not None:
dist.all_reduce(total_norm_cuda,
@ -545,7 +539,7 @@ def get_weight_norm(parameters, norm_type=2, mpu=None):
total_norm += param_norm**norm_type
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
if mpu is not None:
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
@ -669,7 +663,7 @@ class PartitionedTensor:
self.local_data, self.partition = self._partition_tensor(tensor)
@classmethod
def from_meta(cls, meta, local_part, group, device='cuda'):
def from_meta(cls, meta, local_part, group, device=get_accelerator().device_name()):
assert meta.dtype == torch.long
dummy = torch.ones(dist.get_world_size(group=group))
part_obj = cls(tensor=dummy, group=group)
@ -773,14 +767,14 @@ def memory_status(msg, print_rank=-1, reset_max=False):
if print_rank != -1 and rank != print_rank:
return
torch.cuda.synchronize()
get_accelerator().synchronize()
if reset_max:
torch.cuda.reset_max_memory_cached()
torch.cuda.reset_max_memory_allocated()
get_accelerator().reset_max_memory_cached()
get_accelerator().reset_max_memory_allocated()
new_alloced = torch.cuda.memory_allocated()
new_cached = torch.cuda.memory_cached()
new_alloced = get_accelerator().memory_allocated()
new_cached = get_accelerator().memory_cached()
delta_alloced = new_alloced - mem_alloced
delta_cached = new_cached - mem_cached
@ -788,8 +782,8 @@ def memory_status(msg, print_rank=-1, reset_max=False):
mem_cached = new_cached
mem_alloced = new_alloced
max_alloced = torch.cuda.max_memory_allocated()
max_cached = torch.cuda.max_memory_cached()
max_alloced = get_accelerator().max_memory_allocated()
max_cached = get_accelerator().max_memory_cached()
# convert to GB for printing
new_alloced /= 1024**3
@ -802,7 +796,7 @@ def memory_status(msg, print_rank=-1, reset_max=False):
print(
f'RANK={rank} MEMSTATS',
msg,
f'device={torch.cuda.current_device()} '
f'device={get_accelerator().current_device_name()} '
f'current alloc={new_alloced:0.4f}GB (delta={delta_alloced:0.4f}GB max={max_alloced:0.4f}GB) '
f'current cache={new_cached:0.4f}GB (delta={delta_cached:0.4f}GB max={max_cached:0.4f}GB)'
)
@ -811,11 +805,11 @@ def memory_status(msg, print_rank=-1, reset_max=False):
def get_ma_status():
if dist.is_initialized() and not dist.get_rank() == 0:
return 0
return torch.cuda.memory_allocated()
return get_accelerator().memory_allocated()
def empty_cache():
torch.cuda.empty_cache()
get_accelerator().empty_cache()
def see_memory_usage(message, force=False):
@ -830,8 +824,8 @@ def see_memory_usage(message, force=False):
# Print message except when distributed but not rank 0
logger.info(message)
logger.info(
f"MA {round(torch.cuda.memory_allocated() / (1024 * 1024 * 1024),2 )} GB \
Max_MA {round(torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024),2)} GB \
f"MA {round(get_accelerator().memory_allocated() / (1024 * 1024 * 1024),2 )} GB \
Max_MA {round(get_accelerator().max_memory_allocated() / (1024 * 1024 * 1024),2)} GB \
CA {round(torch_memory_reserved() / (1024 * 1024 * 1024),2)} GB \
Max_CA {round(torch_max_memory_reserved() / (1024 * 1024 * 1024))} GB ")
@ -841,8 +835,7 @@ def see_memory_usage(message, force=False):
f'CPU Virtual Memory: used = {used_GB} GB, percent = {vm_stats.percent}%')
# get the peak memory to report correct data, so reset the counter for the next call
if hasattr(torch.cuda, "reset_peak_memory_stats"): # pytorch 1.4+
torch.cuda.reset_peak_memory_stats()
get_accelerator().reset_peak_memory_stats()
def call_to_str(base, *args, **kwargs):
@ -916,7 +909,7 @@ def get_global_norm_of_tensors(input_tensors, norm_type=2, mpu=None):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(t.data.abs().max() for t in input_tensors)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
if mpu is not None:
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.MAX,
@ -925,7 +918,7 @@ def get_global_norm_of_tensors(input_tensors, norm_type=2, mpu=None):
else:
total_norm = sum(
[t.data.float().norm(norm_type).item()**norm_type for t in input_tensors])
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
if mpu is not None:
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,

Просмотреть файл

@ -1,5 +1,6 @@
import torch
from ..module_inject.replace_policy import HFBertLayerPolicy, replace_policies
from deepspeed.accelerator import get_accelerator
class WeightQuantization(object):
@ -44,9 +45,11 @@ class WeightQuantization(object):
q_scale.append(data_scale)
value_list[index] = data_int
index += 1
q_scale = (1 / torch.cat(q_scale,
dim=merge_dim).to(
torch.cuda.current_device()).view(-1).unsqueeze(0))
q_scale = (
1 /
torch.cat(q_scale,
dim=merge_dim).to(
get_accelerator().current_device_name()).view(-1).unsqueeze(0))
if "mlp.dense_4h_to_h.weight" in key:
self.mlp4hh_scales.append(q_scale)
elif "mlp.dense_h_to_4h.weight" in key:
@ -63,7 +66,7 @@ class WeightQuantization(object):
torch.cat((s,
torch.zeros((1,
max_dim - s.shape[-1]),
device=torch.cuda.current_device())),
device=get_accelerator().current_device_name())),
dim=-1) if s.shape[-1] < max_dim else s for s in layer_scales
]
return torch.cat(layer_scales).unsqueeze(0)
@ -134,9 +137,8 @@ class WeightQuantization(object):
else:
data_quantized, data_scale = self.quantize_data(keys[key], quantize_bits, groups)
keys[key].copy_(data_quantized)
layer_scales.append(
(1 /
data_scale.to(torch.cuda.current_device()).view(-1).unsqueeze(0)))
layer_scales.append((1 / data_scale.to(
get_accelerator().current_device_name()).view(-1).unsqueeze(0)))
all_scales.append(self.merge_layer_scales(layer_scales))
return layer

Просмотреть файл

@ -19,6 +19,7 @@ from torch.nn import init
from torch.nn.modules.module import Module
from deepspeed.runtime.utils import noop_decorator
from deepspeed import comm as dist
from deepspeed.accelerator import get_accelerator
tensor_map = {}
@ -28,10 +29,15 @@ def print_rank_0(message, debug=False, force=False):
print(message)
try:
autocast_custom_fwd = torch.cuda.amp.custom_fwd
autocast_custom_bwd = torch.cuda.amp.custom_bwd
except (ImportError, AttributeError) as exp:
device = get_accelerator().device_name()
if device == 'cuda':
try:
autocast_custom_fwd = torch.cuda.amp.custom_fwd
autocast_custom_bwd = torch.cuda.amp.custom_bwd
except (ImportError, AttributeError) as exp:
autocast_custom_fwd = noop_decorator
autocast_custom_bwd = noop_decorator
else:
autocast_custom_fwd = noop_decorator
autocast_custom_bwd = noop_decorator

Просмотреть файл

@ -5,7 +5,6 @@ Licensed under the MIT license.
import sys
import torch
from torch.cuda import Stream
from collections import OrderedDict
from deepspeed.runtime.utils import see_memory_usage
from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum
@ -13,6 +12,7 @@ from deepspeed.runtime.zero.partition_parameters import _init_external_params
from deepspeed.runtime.zero.partition_parameters import *
from deepspeed.runtime.zero.partitioned_param_coordinator import PartitionedParameterCoordinator, iter_params
from deepspeed import comm as dist
from deepspeed.accelerator import get_accelerator
FWD_MODULE_STACK = list()
@ -223,8 +223,8 @@ class DeepSpeedZeRoOffload(object):
self._prefetch_bucket_sz = int(prefetch_bucket_size)
self._max_reuse_distance_in_numel = int(max_reuse_distance)
self._max_available_parameters_in_numel = int(max_live_parameters)
self.__allgather_stream = Stream(
) if overlap_comm else torch.cuda.default_stream()
self.__allgather_stream = get_accelerator().Stream(
) if overlap_comm else get_accelerator().default_stream()
self.forward_hooks = []
self.backward_hooks = []

Просмотреть файл

@ -31,6 +31,7 @@ from deepspeed.utils.debug import (debug_param2name_id_shape,
debug_module2name,
debug_param2name_id,
debug_param2name_id_shape_status)
from deepspeed.accelerator import get_accelerator
from ..swap_tensor.partitioned_param_swapper import AsyncPartitionedParameterSwapper, PartitionedParamStatus
param_count = 0
@ -191,7 +192,8 @@ def zero_wrapper_for_fp_tensor_constructor(fn: Callable,
target_fp_dtype: torch.dtype) -> Callable:
def wrapped_fn(*args, **kwargs) -> Tensor:
if kwargs.get("device", None) is None:
kwargs['device'] = torch.device('cuda:{}'.format(os.environ["LOCAL_RANK"]))
kwargs['device'] = torch.device(get_accelerator().device_name(
os.environ["LOCAL_RANK"]))
tensor: Tensor = fn(*args, **kwargs)
if tensor.is_floating_point():
tensor = tensor.to(target_fp_dtype)
@ -203,7 +205,7 @@ def zero_wrapper_for_fp_tensor_constructor(fn: Callable,
def get_new_tensor_fn_for_dtype(dtype: torch.dtype) -> Callable:
def new_tensor(cls, *args) -> Tensor:
device = torch.device('cuda:{}'.format(os.environ["LOCAL_RANK"]))
device = torch.device(get_accelerator().device_name(os.environ["LOCAL_RANK"]))
tensor = _orig_torch_empty(0, device=device).new_empty(*args)
if tensor.is_floating_point():
tensor = tensor.to(dtype)
@ -231,10 +233,10 @@ def get_all_subclasses(cls):
def free_param(param: Parameter) -> None:
"""Free underlying storage of a parameter."""
assert not param.ds_active_sub_modules, param.ds_summary()
if param.data.is_cuda:
if get_accelerator().on_accelerator(param.data):
# need to make sure that we don't free the parameter while it is still
# being used for computation
param.data.record_stream(torch.cuda.current_stream())
param.data.record_stream(get_accelerator().current_stream())
# param.data doesn't store anything meaningful in partitioned state
param.data = torch.empty(0, dtype=param.dtype, device=param.device)
param.ds_status = ZeroParamStatus.NOT_AVAILABLE
@ -526,7 +528,7 @@ class AllGatherCoalescedHandle:
param.ds_status = ZeroParamStatus.AVAILABLE
for part_to_copy in partitions:
part_to_copy.record_stream(torch.cuda.current_stream())
part_to_copy.record_stream(get_accelerator().current_stream())
param_offset += param.ds_tensor.ds_numel
@ -672,8 +674,9 @@ class Init(InsertPostInitMethodToModuleSubClasses):
# Local device is the device where the parameters are consumed, must be default device.
# It is the device where parameters are fully instantiated using allgather
self.local_device = torch.device('cuda:{}'.format(os.environ["LOCAL_RANK"]))
torch.cuda.set_device(self.local_device)
self.local_device = torch.device(get_accelerator().device_name(
os.environ["LOCAL_RANK"]))
get_accelerator().set_device(self.local_device)
if _ds_config is not None and _ds_config.zero_config.offload_param is not None:
remote_device = _ds_config.zero_config.offload_param.device
@ -747,7 +750,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
f"Partitioning param {debug_param2name_id_shape(param)} module={debug_module2name(module)}"
)
if param.is_cuda:
if get_accelerator().on_accelerator(param):
dist.broadcast(param, 0, self.ds_process_group)
else:
if dist.get_rank() == 0:
@ -839,11 +842,11 @@ class Init(InsertPostInitMethodToModuleSubClasses):
param_buffer = torch.empty(
math.ceil(param.ds_numel / self.world_size) * self.world_size,
dtype=param.dtype,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
requires_grad=False,
)
handle = _dist_allgather_fn(
param.ds_tensor.to(torch.cuda.current_device()),
param.ds_tensor.to(get_accelerator().current_device_name()),
param_buffer,
self.ds_process_group)
param.data = param_buffer.narrow(0,
@ -856,7 +859,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
flat_tensor = torch.empty(partition_sz * self.world_size,
dtype=get_only_unique_item(p.dtype
for p in params),
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
requires_grad=False)
partitions: List[Parameter] = []
for i in range(self.world_size):
@ -865,9 +868,11 @@ class Init(InsertPostInitMethodToModuleSubClasses):
partition_sz * i,
partition_sz))
instrument_w_nvtx(torch.cat)(
[p.ds_tensor.to(torch.cuda.current_device()) for p in params],
out=partitions[self.rank])
instrument_w_nvtx(torch.cat)([
p.ds_tensor.to(get_accelerator().current_device_name())
for p in params
],
out=partitions[self.rank])
handle = _dist_allgather_fn(partitions[self.rank],
flat_tensor,
self.ds_process_group)
@ -1103,7 +1108,8 @@ class Init(InsertPostInitMethodToModuleSubClasses):
device=OffloadDeviceEnum.cpu if self.remote_device
== OffloadDeviceEnum.nvme else self.remote_device)
if self.pin_memory:
partitioned_tensor = partitioned_tensor.pin_memory()
partitioned_tensor = get_accelerator().pin_memory(
partitioned_tensor)
partitioned_tensor.requires_grad = False
param.ds_tensor = partitioned_tensor
@ -1195,7 +1201,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
f'After allocate allgather param {debug_param2name_id_shape_status(param)} {aligned_param_size} {partition_size} ',
force=False)
torch.cuda.synchronize()
get_accelerator().synchronize()
print_rank_0(
f"{'--'* hierarchy}----allgather param with {debug_param2name_id_shape_status(param)} partition size={partition_size}"
@ -1209,7 +1215,8 @@ class Init(InsertPostInitMethodToModuleSubClasses):
if self.use_all_gather_base:
# try the _all_gather_base on PyTorch master branch
handle = dist.all_gather_base(flat_tensor,
param.ds_tensor.cuda(),
param.ds_tensor.to(
get_accelerator().device_name()),
group=self.ds_process_group,
async_op=async_op)
else:
@ -1243,7 +1250,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
local_tensors = []
for param in param_list:
partition_sizes.append(param.ds_tensor.ds_numel)
local_tensors.append(param.ds_tensor.cuda())
local_tensors.append(param.ds_tensor.to(get_accelerator().device_name()))
# allocate memory for allgather params
allgather_params = []
@ -1274,7 +1281,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
psize = partition_sizes[param_idx]
partition = allgather_params[param_idx].narrow(0, i * psize, psize)
output_list.append(partition)
if not partition.is_cuda:
if not get_accelerator().on_accelerator(partition):
logger.warning(
f'param {param_idx}, partition {i} is not on CUDA, partition shape {partition.size()}'
)
@ -1297,7 +1304,7 @@ class Init(InsertPostInitMethodToModuleSubClasses):
param.ds_numel).view(param.ds_shape).data
# guarantee the communication to be completed
torch.cuda.synchronize()
get_accelerator().synchronize()
return None

Просмотреть файл

@ -7,7 +7,6 @@ from dataclasses import dataclass
import collections
from collections import UserDict
from typing import Deque, Set
from torch.cuda import Event, Stream
from deepspeed import comm as dist
from deepspeed.utils.logging import logger
@ -15,6 +14,7 @@ from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum
from deepspeed.runtime.zero.partition_parameters import *
from deepspeed.runtime.swap_tensor.partitioned_param_swapper import PartitionedParamStatus
from deepspeed.utils.debug import debug_module2name_id, debug_param2name_id
from deepspeed.accelerator import get_accelerator
def debug_rank0(message: str) -> None:
@ -66,7 +66,7 @@ class PartitionedParameterCoordinator:
prefetch_bucket_sz: int,
max_reuse_distance_in_numel: int,
max_available_parameters_in_numel: int,
allgather_stream: Stream,
allgather_stream: get_accelerator().Stream,
prefetch_nvme: bool = False,
) -> None:
# mapping of param -> handle for each param that is currently in flight
@ -95,7 +95,7 @@ class PartitionedParameterCoordinator:
self.hierarchy: int = 0
# stream that will be used for allgather operations
self.__allgather_stream: Stream = allgather_stream
self.__allgather_stream: get_accelerator().Stream = allgather_stream
# limit the number of fetch events that can be queued at once
# otherwise, what happens is memory is allocated by the host thread at the
@ -106,7 +106,7 @@ class PartitionedParameterCoordinator:
# cudaMallocAsync/cudaFreeAsync. Choosing to not expose this to the user now
# because ideally in the future its replaced by an async allocation
# mechanism which doesn't require any configuration by the user.
self.__ongoing_fetch_events: Deque[Event] = collections.deque()
self.__ongoing_fetch_events: Deque[get_accelerator().Event] = collections.deque()
# TODO. make this configurable via JSON
self.__max_ongoing_fetch_events: int = 2
@ -262,7 +262,7 @@ class PartitionedParameterCoordinator:
param.ds_active_sub_modules.add(current_submodule.id)
debug_rank0(f"-wait: {param.ds_summary()}")
if param in self.__inflight_param_registry:
with torch.cuda.stream(self.__allgather_stream):
with get_accelerator().stream(self.__allgather_stream):
while self.__ongoing_fetch_events and self.__ongoing_fetch_events[
0].query():
self.__ongoing_fetch_events.popleft()
@ -272,12 +272,12 @@ class PartitionedParameterCoordinator:
self.__inflight_param_registry.pop(param).wait()
event = Event()
event = get_accelerator().Event()
event.record()
self.__ongoing_fetch_events.append(event)
assert param.ds_status == ZeroParamStatus.AVAILABLE, param.ds_summary()
torch.cuda.current_stream().wait_stream(self.__allgather_stream)
get_accelerator().current_stream().wait_stream(self.__allgather_stream)
# kick off parameter prefetches for upcoming modules
# don't prefetch if we dont have a completed model trace
@ -395,7 +395,7 @@ class PartitionedParameterCoordinator:
self.__n_available_params += param.ds_numel
if partitioned_params:
with torch.cuda.stream(self.__allgather_stream):
with get_accelerator().stream(self.__allgather_stream):
handle = partitioned_params[0].all_gather_coalesced(partitioned_params)
for param in partitioned_params:

94
deepspeed/runtime/zero/stage3.py Executable file → Normal file
Просмотреть файл

@ -7,7 +7,6 @@ import sys
import gc
import collections
from typing import Deque, Dict, Tuple
from torch.cuda import Event, Stream
from torch._six import inf
from deepspeed.runtime import ZeROOptimizer
@ -20,11 +19,12 @@ from deepspeed.runtime.zero.config import ZeroStageEnum
from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum
from deepspeed.runtime.zero.parameter_offload import DeepSpeedZeRoOffload
from deepspeed.ops.adam import DeepSpeedCPUAdam
from deepspeed.ops.op_builder import UtilsBuilder
from deepspeed.runtime.swap_tensor.partitioned_param_swapper import PartitionedParamStatus
from deepspeed.runtime.swap_tensor.partitioned_optimizer_swapper import PartitionedOptimizerSwapper
from deepspeed.runtime.swap_tensor.pipelined_optimizer_swapper import PipelinedOptimizerSwapper
from deepspeed.checkpoint.constants import OPTIMIZER_STATE_DICT, FP32_FLAT_GROUPS, PARTITION_COUNT, ZERO_STAGE
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import UtilsBuilder
# Toggle this to true to enable correctness test
# with gradient partitioning and without
@ -122,8 +122,8 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
# - assume all params requires grad
# - flat by groups, not keeping state. TODO: remove state explicitly?
# - master grad and unflat master weight never exist. TODO: a way to save out unflat master?
if not torch.cuda.is_available():
raise SystemError("Cannot use fp16 without CUDA.")
if not get_accelerator().is_available():
raise SystemError("Cannot use fp16 without accelerator.")
self.optimizer = init_optimizer
@ -170,17 +170,17 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self.__inf_or_nan_tracker: Tensor = torch.zeros(
1,
dtype=torch.bool,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
requires_grad=False)
self.deepspeed_adam_offload = (self.offload_optimizer
and type(init_optimizer) == DeepSpeedCPUAdam)
self.device = torch.cuda.current_device(
self.device = get_accelerator().current_device_name(
) if not self.offload_optimizer else OffloadDeviceEnum.cpu
### streams used for overlapping computation with communication
self.__reduce_and_partition_stream = Stream(
) if overlap_comm else torch.cuda.default_stream()
self.__reduce_and_partition_stream = get_accelerator().Stream(
) if overlap_comm else get_accelerator().default_stream()
############################################################################
@ -269,7 +269,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self.__params_in_ipg_bucket: List[Parameter] = []
self.is_gradient_accumulation_boundary: bool = True
self.__param_reduce_events: Deque[Event] = collections.deque()
self.__param_reduce_events: Deque[get_accelerator().Event] = collections.deque()
# TODO. make this configurable via JSON
self.__max_param_reduce_events: int = 2
@ -386,18 +386,20 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self.__ipg_bucket_flat_buffer: Tensor = torch.empty(
self.reduce_bucket_size,
dtype=self.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
grad_partitions_flat_buffer = None
self.__param_id_to_grad_partition: Dict[int, Tensor] = {}
all_params = list(itertools.chain.from_iterable(self.fp16_groups))
grad_partitions_flat_buffer: Tensor = torch.zeros(
sum(p.partition_numel() for p in all_params),
dtype=self.dtype,
device=self.device,
pin_memory=self.offload_optimizer_pin_memory)
grad_partitions_flat_buffer: Tensor = torch.zeros(sum(p.partition_numel()
for p in all_params),
dtype=self.dtype,
device=self.device)
if self.offload_optimizer_pin_memory:
grad_partitions_flat_buffer = get_accelerator().pin_memory(
grad_partitions_flat_buffer)
offset = 0
for param in all_params:
@ -444,7 +446,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
offset += tensor_numel
gc.collect()
torch.cuda.empty_cache()
get_accelerator().empty_cache()
# copy tensors (now flattened and contiguous) back to GPU
device_buffer = cpu_buffer.to(orig_device)
@ -553,9 +555,9 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
print_rank_0(f"group {j} flat buffer size {flat_buffer_size}",
force=False)
self.param_groups_fp16_flat_cpu_memory.append(
torch.empty(int(flat_buffer_size),
dtype=self.dtype,
pin_memory=True))
get_accelerator().pin_memory(
torch.empty(int(flat_buffer_size),
dtype=self.dtype)))
else:
print_rank_0(
f"No flat buffer size. Param group size was {params_in_group}",
@ -925,7 +927,8 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
dtype=gradient_dtype,
device=self.device)
if self.offload_optimizer_pin_memory:
subgroup_gradient_buffer = subgroup_gradient_buffer.pin_memory()
subgroup_gradient_buffer = get_accelerator().pin_memory(
subgroup_gradient_buffer)
self.fp32_partitioned_groups_flat[i].grad = subgroup_gradient_buffer
else:
@ -1102,19 +1105,20 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
@instrument_w_nvtx
@torch.no_grad()
def __add_grad_to_ipg_bucket(self, param: Parameter) -> None:
self.__reduce_and_partition_stream.wait_stream(torch.cuda.default_stream())
self.__reduce_and_partition_stream.wait_stream(
get_accelerator().default_stream())
if self.contiguous_gradients and self.elements_in_ipg_bucket + param.grad.numel(
) < self.reduce_bucket_size:
# move the gradient to a contiguous buffer
with torch.cuda.stream(self.__reduce_and_partition_stream):
with get_accelerator().stream(self.__reduce_and_partition_stream):
# move the parameter's gradient to the contiguous flat buffer
new_grad_tensor = self.__ipg_bucket_flat_buffer.narrow(
0,
self.elements_in_ipg_bucket,
param.grad.numel()).view_as(param.grad)
new_grad_tensor.copy_(param.grad, non_blocking=True)
param.grad.record_stream(torch.cuda.current_stream())
param.grad.record_stream(get_accelerator().current_stream())
param.grad.data = new_grad_tensor
self.__params_in_ipg_bucket.append(param)
@ -1141,7 +1145,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
if len(self.__param_reduce_events) > self.__max_param_reduce_events:
self.__param_reduce_events.popleft().synchronize()
with torch.cuda.stream(self.__reduce_and_partition_stream):
with get_accelerator().stream(self.__reduce_and_partition_stream):
if safe_mode:
assert_ints_same_as_other_ranks(
[p.ds_id for p in self.__params_in_ipg_bucket])
@ -1151,7 +1155,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self.__params_in_ipg_bucket.clear()
event = Event()
event = get_accelerator().Event()
event.record()
self.__param_reduce_events.append(event)
@ -1215,7 +1219,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self.norm_for_param_grads[param_id] = self._constant_buffered_norm2(param.grad)
def async_inplace_copy_grad_to_fp32_buffer_from_gpu(self, param, fp32_grad_tensor):
with torch.cuda.stream(self.copy_grad_stream):
with get_accelerator().stream(self.copy_grad_stream):
param_id = self.get_param_id(param)
src_tensor = param.grad.view(-1).float()
#print(f"src_tensor {src_tensor.size()} and fp32 grad {fp32_grad_tensor.size()}")
@ -1233,7 +1237,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
total_norm += param_norm.item()**2
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
@ -1271,7 +1275,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
# ensure grad buffer is a CUDA buffer to speed up the next few
# operations and so it can be used asynchronously
grad_buffer = grad_buffer.to(grad_partition.device, non_blocking=True)
elif grad_buffer.is_cuda:
elif get_accelerator().on_accelerator(grad_buffer):
grad_buffer.add_(grad_partition)
else:
# if dst is CPU, copy first to src device, do the addition
@ -1316,7 +1320,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
fp32_grad_tensor.copy_(grad_buffer)
# free the gradient
param.grad.record_stream(torch.cuda.current_stream())
param.grad.record_stream(get_accelerator().current_stream())
param.grad = None
if self.offload_optimizer and self.swap_optimizer:
@ -1429,7 +1433,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
# if rank is specified do a reduction instead of an allreduce
def allreduce_and_copy(self, small_bucket, rank=None, log=None):
with torch.cuda.stream(self.reduction_stream):
with get_accelerator().stream(self.reduction_stream):
allreduced = self.allreduce_bucket(small_bucket, rank=rank, log=log)
if rank is None or rank == dist.get_rank(group=self.dp_process_group):
for buf, synced in zip(small_bucket, self.unflatten(allreduced, small_bucket)):
@ -1520,8 +1524,8 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
for group in self.fp16_groups:
for p in group:
if set_to_none:
if p.grad is not None and p.grad.is_cuda:
p.grad.record_stream(torch.cuda.current_stream())
if p.grad is not None and get_accelerator().on_accelerator(p.grad):
p.grad.record_stream(get_accelerator().current_stream())
p.grad = None
else:
if p.grad is not None:
@ -1557,7 +1561,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(g.data.abs().max() for g in gradients)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.MAX,
group=self.dp_process_group)
@ -1571,7 +1575,9 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
grad_norms = []
for g, p in zip(gradients, params):
if is_model_parallel_parameter(p) or (self.model_parallel_rank == 0):
grad_norms.append(g.cuda(non_blocking=True).double().norm(2))
grad_norms.append(
g.to(get_accelerator().device_name(),
non_blocking=True).double().norm(2))
# Sum across all model parallel GPUs.
total_norm_cuda = torch.sum(torch.pow(torch.stack(grad_norms), 2))
@ -1710,8 +1716,9 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
# release all the gradient since we have already created a necessary copy in dp_grad_partition
self.zero_grad(set_to_none=True)
for grad in filter(lambda g: g.is_cuda, self.averaged_gradients[sub_group_id]):
grad.record_stream(torch.cuda.current_stream())
for grad in filter(lambda g: get_accelerator().on_accelerator(g),
self.averaged_gradients[sub_group_id]):
grad.record_stream(get_accelerator().current_stream())
self.averaged_gradients[sub_group_id] = None
@ -1931,9 +1938,8 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
self._post_step(timer_names)
# warn user about caching allocator flushes
alloc_retries = torch.cuda.memory_stats()["num_alloc_retries"] if hasattr(
torch.cuda,
"memory_stats") else 0
memory_stats = get_accelerator().memory_stats()
alloc_retries = memory_stats["num_alloc_retries"] if memory_stats != None else 0
if alloc_retries > self.__n_caching_allocator_flushes:
if dist.get_rank() == 0:
logger.warning(
@ -1942,7 +1948,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
"performance. if this is happening frequently consider adjusting "
"settings to reduce memory consumption. If you are unable to "
"make the cache flushes go away consider adding "
"torch.cuda.empty_cache() calls in your training loop to ensure "
"get_accelerator().empty_cache() calls in your training loop to ensure "
"that all ranks flush their caches at the same time",
alloc_retries - self.__n_caching_allocator_flushes)
self.__n_caching_allocator_flushes = alloc_retries
@ -2013,13 +2019,13 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
@instrument_w_nvtx
def has_overflow(self, partition_gradients=True):
if partition_gradients:
with torch.cuda.stream(self.__reduce_and_partition_stream):
with get_accelerator().stream(self.__reduce_and_partition_stream):
self.local_overflow = bool(self.__inf_or_nan_tracker.item())
self.__inf_or_nan_tracker.zero_()
overflow = self.local_overflow
#overflow = self.has_overflow_partitioned_grads_serial()
overflow_gpu = torch.cuda.ByteTensor([overflow])
overflow_gpu = get_accelerator().ByteTensor([overflow])
dist.all_reduce(overflow_gpu,
op=dist.ReduceOp.MAX,
group=self.dp_process_group)
@ -2031,7 +2037,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
params.append(param)
overflow = self.has_overflow_serial(params, is_grad_list=partition_gradients)
overflow_gpu = torch.cuda.ByteTensor([overflow])
overflow_gpu = get_accelerator().ByteTensor([overflow])
# Since each model parallel GPU carries only part of the model,
# make sure overflow flag is synced across all the model parallel GPUs
@ -2371,7 +2377,7 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
will call ``model.load_state_dict()`` before
``fp16_optimizer_instance.load_state_dict()`` is called.
Example::
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = torch.nn.Linear(D_in, D_out).to(get_accelerator().device_name()).half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
optimizer = FP16_Optimizer(optimizer, static_loss_scale = 128.0)
...

Просмотреть файл

@ -22,12 +22,14 @@ from deepspeed.runtime.utils import (bwc_tensor_model_parallel_rank,
from deepspeed.runtime.zero.config import ZeroStageEnum
from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum
from deepspeed.ops.adam import DeepSpeedCPUAdam
from deepspeed.ops.op_builder import UtilsBuilder
from deepspeed.utils import logger
from deepspeed.moe.utils import is_moe_param
from deepspeed.git_version_info import version
from deepspeed.runtime.constants import PIPE_REPLICATED
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import UtilsBuilder
from deepspeed.checkpoint.constants import (DS_VERSION,
GROUP_PADDINGS,
PARTITION_COUNT,
@ -49,11 +51,12 @@ def input(msg):
def split_half_float_double(tensors):
device_type = get_accelerator().device_name()
dtypes = [
"torch.cuda.HalfTensor",
"torch.cuda.FloatTensor",
"torch.cuda.DoubleTensor",
"torch.cuda.BFloat16Tensor"
"torch.{}.HalfTensor".format(device_type),
"torch.{}.FloatTensor".format(device_type),
"torch.{}.DoubleTensor".format(device_type),
"torch.{}.BFloat16Tensor".format(device_type)
]
buckets = []
for i, dtype in enumerate(dtypes):
@ -154,8 +157,8 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
# - assume all params requires grad
# - flat by groups, not keeping state. TODO: remove state explicitly?
# - master grad and unflat master weight never exist. TODO: a way to save out unflat master?
if not torch.cuda.is_available():
raise SystemError("Cannot use fp16 without CUDA.")
if not get_accelerator().is_available():
raise SystemError("Cannot use fp16 without accelerator.")
self.optimizer = init_optimizer
# Load pre-built or JIT compile (un)flatten ops
@ -176,7 +179,8 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.deepspeed_adam_offload = cpu_offload
self.device = torch.cuda.current_device() if not self.cpu_offload else 'cpu'
self.device = get_accelerator().current_device_name(
) if not self.cpu_offload else 'cpu'
self.dp_process_group = dp_process_group
@ -320,8 +324,8 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.flatten_dense_tensors_aligned(
self.round_robin_bit16_groups[i],
self.nccl_start_alignment_factor *
dist.get_world_size(group=self.real_dp_process_group[i])).cuda(
torch.cuda.current_device()))
dist.get_world_size(group=self.real_dp_process_group[i])).to(
get_accelerator().current_device_name()))
see_memory_usage(f"After flattening and moving param group {i} to GPU",
force=False)
@ -395,10 +399,11 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.reduce_bucket_size = int(reduce_bucket_size)
self.allgather_bucket_size = int(allgather_bucket_size)
self.reduction_event = torch.cuda.Event(enable_timing=False, blocking=False)
self.reduction_stream = torch.cuda.Stream()
self.cpu_computation_stream = torch.cuda.Stream()
self.copy_grad_stream = torch.cuda.Stream()
self.reduction_event = get_accelerator().Event(enable_timing=False,
blocking=False)
self.reduction_stream = get_accelerator().Stream()
self.cpu_computation_stream = get_accelerator().Stream()
self.copy_grad_stream = get_accelerator().Stream()
self.callback_queued = False
self.param_dict = {}
@ -443,13 +448,13 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.norm_for_param_grads = {}
self.local_overflow = False
self.grad_position = {}
self.temp_grad_buffer_for_cpu_offload = torch.zeros(
largest_param_numel,
device=self.device,
dtype=self.dtype).pin_memory()
self.temp_grad_buffer_for_cpu_offload = get_accelerator().pin_memory(
torch.zeros(largest_param_numel,
device=self.device,
dtype=self.dtype))
self.temp_grad_buffer_for_gpu_offload = torch.zeros(
largest_param_numel,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
dtype=self.dtype)
for i, params_group in enumerate(self.bit16_groups):
self.get_grad_position(i,
@ -637,9 +642,8 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
int(self.partition_size[i]),
dtype=self.single_partition_of_fp32_groups[i].dtype,
device=self.device)
self.single_partition_of_fp32_groups[
i].grad = single_grad_partition.pin_memory(
) if self.cpu_offload else single_grad_partition
self.single_partition_of_fp32_groups[i].grad = get_accelerator().pin_memory(
single_grad_partition) if self.cpu_offload else single_grad_partition
self.optimizer.step()
@ -661,7 +665,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.ipg_buffer = []
buf_0 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
self.ipg_buffer.append(buf_0)
self.ipg_index = 0
@ -722,7 +726,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.params_already_reduced[i] = False
if self.overlap_comm:
torch.cuda.synchronize()
get_accelerator().synchronize()
# It is safe to clear previously reduced grads of other partitions
self._clear_previous_reduced_grads()
@ -735,15 +739,16 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.first_offset[i],
self.partition_size[i],
dtype=self.dtype,
device=torch.cuda.current_device(),
device=get_accelerator().current_device_name(),
return_tensor_list=True)
else:
avg_new = self.get_flat_partition(self.params_in_partition[i],
self.first_offset[i],
self.partition_size[i],
dtype=self.dtype,
device=torch.cuda.current_device(),
return_tensor_list=True)
avg_new = self.get_flat_partition(
self.params_in_partition[i],
self.first_offset[i],
self.partition_size[i],
dtype=self.dtype,
device=get_accelerator().current_device_name(),
return_tensor_list=True)
for accumulated_grad, new_avg_grad in zip(self.averaged_gradients[i], avg_new):
accumulated_grad.add_(new_avg_grad)
@ -937,11 +942,11 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
def average_tensor(self, tensor):
if self.overlap_comm:
stream = self.reduction_stream
stream.wait_stream(torch.cuda.current_stream())
stream.wait_stream(get_accelerator().current_stream())
else:
stream = torch.cuda.current_stream()
stream = get_accelerator().current_stream()
with torch.cuda.stream(stream):
with get_accelerator().stream(stream):
if not self.reduce_scatter:
self.gradient_reduction_w_predivide(tensor)
return
@ -1080,9 +1085,10 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
#buffer for storing gradients for this parameter in CPU
def buffer_to_accumulate_to_in_cpu():
if not self.fp16_master_weights_and_gradients:
return torch.zeros(param.numel(),
dtype=param.dtype,
device=self.device).pin_memory()
return get_accelerator().pin_memory(
torch.zeros(param.numel(),
dtype=param.dtype,
device=self.device))
else:
return self.single_partition_of_fp32_groups[i].grad.view(-1).narrow(
0,
@ -1201,7 +1207,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
"""
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
group=self.dp_process_group)
@ -1240,9 +1246,10 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
total_size += param_in_partition.numel()
see_memory_usage(f"before copying {total_size} gradients into partition")
self.grads_in_partition = torch.empty(int(total_size),
dtype=self.dtype,
device=torch.cuda.current_device())
self.grads_in_partition = torch.empty(
int(total_size),
dtype=self.dtype,
device=get_accelerator().current_device_name())
see_memory_usage(f"after copying {total_size} gradients into partition")
# The allreduce buffer will be rewritten. Copy the gradients in partition to a new buffer
@ -1276,13 +1283,13 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
stream = self.reduction_stream
elif self.cpu_offload:
# TODO: copy_grad_stream is disabled because of race with reduce. This hurts perf and should be fixed.
# torch.cuda.synchronize()
# get_accelerator().synchronize()
# stream = self.copy_grad_stream
stream = torch.cuda.current_stream()
stream = get_accelerator().current_stream()
else:
stream = torch.cuda.current_stream()
stream = get_accelerator().current_stream()
with torch.cuda.stream(stream):
with get_accelerator().stream(stream):
for _, param, param_id in self.params_in_ipg_bucket:
assert self.params_already_reduced[param_id] == False, \
@ -1424,14 +1431,14 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
# if rank is specified do a reduction instead of an allreduce
def allreduce_and_copy(self, small_bucket, rank=None, log=None):
if self.overlap_comm:
torch.cuda.synchronize()
get_accelerator().synchronize()
# It is safe to clear the previously reduced grads of other partitions
self._clear_previous_reduced_grads()
stream = self.reduction_stream
else:
stream = torch.cuda.current_stream()
stream = get_accelerator().current_stream()
with torch.cuda.stream(stream):
with get_accelerator().stream(stream):
allreduced = self.allreduce_bucket(small_bucket, rank=rank, log=log)
if rank is None or rank == dist.get_rank(group=self.dp_process_group):
for buf, synced in zip(small_bucket, self.unflatten(allreduced, small_bucket)):
@ -1569,7 +1576,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(g.data.abs().max() for g in gradients)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.MAX,
group=self.dp_process_group)
@ -1589,7 +1596,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
param_norm = g.data.double().norm(2)
total_norm += param_norm.item()**2
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
total_norm_cuda = get_accelerator().FloatTensor([float(total_norm)])
dist.all_reduce(total_norm_cuda,
op=dist.ReduceOp.SUM,
group=self.dp_process_group)
@ -1889,7 +1896,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
scaled_norm = norm * 1.0 / float(
dist.get_world_size(group=self.real_dp_process_group[i]))
scaled_norm_tensor = torch.tensor(scaled_norm,
device='cuda',
device=get_accelerator().device_name(),
dtype=torch.float)
dist.all_reduce(scaled_norm_tensor, group=self.real_dp_process_group[i])
norm_groups[i] = scaled_norm_tensor.item()
@ -1933,7 +1940,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
if partition_gradients:
overflow = self.local_overflow if self.cpu_offload else self.has_overflow_partitioned_grads_serial(
)
overflow_gpu = torch.cuda.ByteTensor([overflow])
overflow_gpu = get_accelerator().ByteTensor([overflow])
'''This will capture overflow across all data parallel and expert parallel process
Since expert parallel process are a subset of data parallel process'''
dist.all_reduce(overflow_gpu,
@ -1947,7 +1954,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
params.append(param)
overflow = self.has_overflow_serial(params, is_grad_list=partition_gradients)
overflow_gpu = torch.cuda.ByteTensor([overflow])
overflow_gpu = get_accelerator().ByteTensor([overflow])
# Since each model parallel GPU carries only part of the model,
# make sure overflow flag is synced across all the model parallel GPUs
@ -1992,14 +1999,14 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.ipg_buffer = []
buf_0 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
self.ipg_buffer.append(buf_0)
# Use double buffers to avoid data access conflict when overlap_comm is enabled.
if self.overlap_comm:
buf_1 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
device=get_accelerator().current_device_name())
self.ipg_buffer.append(buf_1)
self.ipg_index = 0
@ -2283,7 +2290,7 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
will call ``model.load_state_dict()`` before
``fp16_optimizer_instance.load_state_dict()`` is called.
Example::
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = torch.nn.Linear(D_in, D_out).to(get_accelerator().device_name()).half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
optimizer = FP16_Optimizer(optimizer, static_loss_scale = 128.0)
...

Просмотреть файл

@ -7,6 +7,7 @@ from deepspeed.utils import logger
from deepspeed.ops.adam import DeepSpeedCPUAdam
from deepspeed.ops.adam import FusedAdam
from deepspeed.utils.nvtx import instrument_w_nvtx
from deepspeed.accelerator import get_accelerator
def _initialize_parameter_parallel_groups(parameter_parallel_size=None):
@ -63,8 +64,8 @@ def get_lst_from_rank0(lst: List[int]) -> None:
lst_tensor = torch.tensor(
lst if dist.get_rank() == 0 else [-1] * len(lst),
dtype=int,
# device=torch.cuda.current_device(),
device=torch.device('cuda:{}'.format(os.environ["LOCAL_RANK"])),
# device=get_accelerator().current_device_name(),
device=torch.device(get_accelerator().device_name(os.environ["LOCAL_RANK"])),
requires_grad=False,
)
dist.broadcast(lst_tensor, src=0, async_op=False)

Просмотреть файл

@ -1,15 +1,13 @@
import torch
from deepspeed.accelerator import get_accelerator
def instrument_w_nvtx(func):
"""decorator that causes an NVTX range to be recorded for the duration of the
function call."""
if hasattr(torch.cuda.nvtx, "range"):
def wrapped_fn(*args, **kwargs):
get_accelerator().range_push(func.__qualname__)
ret_val = func(*args, **kwargs)
get_accelerator().range_pop()
return ret_val
def wrapped_fn(*args, **kwargs):
with torch.cuda.nvtx.range(func.__qualname__):
return func(*args, **kwargs)
return wrapped_fn
else:
return func
return wrapped_fn

Просмотреть файл

@ -3,9 +3,9 @@ Copyright 2019 The Microsoft DeepSpeed Team
"""
import time
import torch
from numpy import mean
from deepspeed.utils.logging import log_dist
from deepspeed.accelerator import get_accelerator
from deepspeed import comm as dist
try:
@ -18,12 +18,14 @@ except ImportError:
class CudaEventTimer(object):
def __init__(self, start_event: torch.cuda.Event, end_event: torch.cuda.Event):
def __init__(self,
start_event: get_accelerator().Event,
end_event: get_accelerator().Event):
self.start_event = start_event
self.end_event = end_event
def get_elapsed_msec(self):
torch.cuda.current_stream().wait_event(self.end_event)
get_accelerator().current_stream().wait_event(self.end_event)
self.end_event.synchronize()
return self.start_event.elapsed_time(self.end_event)
@ -42,14 +44,14 @@ class SynchronizedWallClockTimer:
def start(self):
"""Start the timer."""
assert not self.started_, f"{self.name_} timer has already been started"
self.start_event = torch.cuda.Event(enable_timing=True)
self.start_event = get_accelerator().Event(enable_timing=True)
self.start_event.record()
self.started_ = True
def stop(self, reset=False, record=False):
"""Stop the timer."""
assert self.started_, "timer is not started"
end_event = torch.cuda.Event(enable_timing=True)
end_event = get_accelerator().Event(enable_timing=True)
end_event.record()
self.event_timers.append(CudaEventTimer(self.start_event, end_event))
self.start_event = None
@ -100,14 +102,14 @@ class SynchronizedWallClockTimer:
@staticmethod
def memory_usage():
alloc = "mem_allocated: {:.4f} GB".format(torch.cuda.memory_allocated() /
alloc = "mem_allocated: {:.4f} GB".format(get_accelerator().memory_allocated() /
(1024 * 1024 * 1024))
max_alloc = "max_mem_allocated: {:.4f} GB".format(
torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024))
cache = "cache_allocated: {:.4f} GB".format(torch.cuda.memory_cached() /
get_accelerator().max_memory_allocated() / (1024 * 1024 * 1024))
cache = "cache_allocated: {:.4f} GB".format(get_accelerator().memory_cached() /
(1024 * 1024 * 1024))
max_cache = "max_cache_allocated: {:.4f} GB".format(
torch.cuda.max_memory_cached() / (1024 * 1024 * 1024))
get_accelerator().max_memory_cached() / (1024 * 1024 * 1024))
return " | {} | {} | {} | {}".format(alloc, max_alloc, cache, max_cache)
def log(self, names, normalizer=1.0, reset=True, memory_breakdown=False, ranks=None):
@ -173,7 +175,7 @@ class ThroughputTimer:
self._init_timer()
self.started = True
if self.global_step_count >= self.start_step:
torch.cuda.synchronize()
get_accelerator().synchronize()
self.start_time = time.time()
def stop(self, global_step=False, report_speed=True):
@ -185,7 +187,7 @@ class ThroughputTimer:
self.global_step_count += 1
if self.start_time > 0:
torch.cuda.synchronize()
get_accelerator().synchronize()
self.end_time = time.time()
duration = self.end_time - self.start_time
self.total_elapsed_time += duration
@ -201,9 +203,9 @@ class ThroughputTimer:
self.global_step_count,
self.avg_samples_per_sec(),
self.batch_size / self.step_elapsed_time,
round(torch.cuda.memory_allocated() / 1024**3,
round(get_accelerator().memory_allocated() / 1024**3,
2),
round(torch.cuda.max_memory_allocated() / 1024**3,
round(get_accelerator().max_memory_allocated() / 1024**3,
2),
))
if self.monitor_memory:

Просмотреть файл

@ -1,17 +1,50 @@
"""
Copyright 2020 The Microsoft DeepSpeed Team
"""
from .cpu_adam import CPUAdamBuilder
from .cpu_adagrad import CPUAdagradBuilder
from .fused_adam import FusedAdamBuilder
from .fused_lamb import FusedLambBuilder
from .sparse_attn import SparseAttnBuilder
from .transformer import TransformerBuilder
from .random_ltd import RandomLTDBuilder
from .stochastic_transformer import StochasticTransformerBuilder
from .utils import UtilsBuilder
from .async_io import AsyncIOBuilder
from .transformer_inference import InferenceBuilder
from .quantizer import QuantizerBuilder
from .spatial_inference import SpatialInferenceBuilder
import sys
import os
import pkgutil
import importlib
from .builder import get_default_compute_capabilities, OpBuilder
# List of all available op builders from deepspeed op_builder
try:
import deepspeed.ops.op_builder # noqa: F401
op_builder_dir = "deepspeed.ops.op_builder"
except ImportError:
op_builder_dir = "op_builder"
__op_builders__ = []
this_module = sys.modules[__name__]
def builder_closure(member_name):
if op_builder_dir == "op_builder":
# during installation time cannot get builder due to torch not installed,
# return closure instead
def _builder():
from deepspeed.accelerator import get_accelerator
builder = get_accelerator().create_op_builder(member_name)
return builder
return _builder
else:
# during runtime, return op builder class directly
from deepspeed.accelerator import get_accelerator
builder = get_accelerator().get_op_builder(member_name)
return builder
# reflect builder names and add builder closure, such as 'TransformerBuilder()' creates op builder wrt current accelerator
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(this_module.__file__)]):
if module_name != 'all_ops' and module_name != 'builder':
module = importlib.import_module(f".{module_name}", package=op_builder_dir)
for member_name in module.__dir__():
if member_name.endswith(
'Builder'
) and member_name != "OpBuilder" and member_name != "CUDAOpBuilder":
# assign builder name to variable with same name
# the following is equivalent to i.e. TransformerBuilder = "TransformerBuilder"
this_module.__dict__[member_name] = builder_closure(member_name)

Просмотреть файл

@ -19,7 +19,7 @@ __op_builders__ = []
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder' and module_name != 'builder_names':
if module_name != 'all_ops' and module_name != 'builder':
module = importlib.import_module("{}.{}".format(op_builder_dir, module_name))
for member_name in module.__dir__():
if member_name.endswith('Builder'):

Просмотреть файл

@ -1,30 +0,0 @@
import sys
import os
import pkgutil
import importlib
# List of all available op builders from deepspeed op_builder
try:
import op_builder # noqa: F401
op_builder_dir = "op_builder"
except ImportError:
op_builder_dir = "deepspeed.ops.op_builder"
op_builder_module = importlib.import_module(op_builder_dir)
__op_builders__ = []
this_module = sys.modules[__name__]
# reflect all builder names into variable definition such as 'TransformerBuilder = "TransformerBuilder"'
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder' and module_name != 'builder_names':
module = importlib.import_module("{}.{}".format(op_builder_dir, module_name))
for member_name in module.__dir__():
if member_name.endswith(
'Builder'
) and member_name != "OpBuilder" and member_name != "CUDAOpBuilder":
# assign builder name to variable with same name
# the following is equivalent to i.e. TransformerBuilder = "TransformerBuilder"
this_module.__dict__[member_name] = member_name