* zero infinity xpu support

* remove env var depends

* client align mem

* sync with all accelerators'

* format fix

* add align in pin_memory api

* add missing brackets

* remove align

* modify pin_memory api

* modify pin_memory api to use only on align para

* change value of align bytes

* Update csrc/aio/common/deepspeed_aio_common.cpp

* add version check and change format

---------

Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com>
This commit is contained in:
Liangliang-Ma 2023-10-03 21:17:08 +08:00 коммит произвёл GitHub
Родитель 2f73b834b5
Коммит 1760627eb9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 53 добавлений и 24 удалений

Просмотреть файл

@ -226,7 +226,11 @@ class DeepSpeedAccelerator(ABC):
...
@abc.abstractmethod
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
...
@abc.abstractmethod
def is_pinned(self, tensor):
...
@abc.abstractmethod

Просмотреть файл

@ -225,9 +225,12 @@ class CPU_Accelerator(DeepSpeedAccelerator):
def LongTensor(self):
return torch.LongTensor
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor
def is_pinned(self, tensor):
return tensor.is_pinned()
def op_builder_dir(self):
try:
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed

Просмотреть файл

@ -210,9 +210,12 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
def LongTensor(self):
return torch.cuda.LongTensor
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()
def is_pinned(self, tensor):
return tensor.is_pinned()
def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith('cuda:'):

Просмотреть файл

@ -192,9 +192,12 @@ class MPS_Accelerator(DeepSpeedAccelerator):
def LongTensor(self):
return
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()
def is_pinned(self, tensor):
return tensor.is_pinned()
def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith("mps"):

Просмотреть файл

@ -191,9 +191,12 @@ class NPU_Accelerator(DeepSpeedAccelerator):
def LongTensor(self):
return torch.npu.LongTensor
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()
def is_pinned(self, tensor):
return tensor.is_pinned()
def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith('npu:'):

Просмотреть файл

@ -115,10 +115,13 @@ static int _do_io_complete(const long long int min_completes,
std::vector<std::chrono::duration<double>>& reap_times)
{
const auto start_time = std::chrono::high_resolution_clock::now();
const auto n_completes = io_getevents(
aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr);
long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);
assert(n_completes >= min_completes);
return n_completes;
}

Просмотреть файл

@ -24,7 +24,8 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
_num_bytes(num_bytes),
_validate(validate)
{
_cpu_buffer = _buffer.is_cuda() ? _buffer.to(torch::kCPU).pin_memory() : _buffer;
_cpu_buffer = (_buffer.is_cuda() || _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory()
: _buffer;
_contiguous_buffer = _cpu_buffer.contiguous();
}
@ -33,6 +34,7 @@ char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr
void io_op_desc_t::fini()
{
if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
if (_read_op && _buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); }
}
deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config)

Просмотреть файл

@ -10,6 +10,7 @@ import torch
from deepspeed import comm as dist
from deepspeed.utils.logging import logger
from deepspeed.runtime.swap_tensor.utils import swap_out_tensors, SwapBuffer
from deepspeed.accelerator import get_accelerator
INVALID_BUFFER_INDEX = -1
ASYNC_SWAPPER_WAIT_TIMER = 'async_swap_gradient_wait'
@ -37,7 +38,7 @@ class AsyncTensorSwapper(object):
def add_buffers(self, buffer_list):
assert len(self.all_buffers) == 0
assert all([buffer.is_pinned() for buffer in buffer_list])
assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list])
dtype = buffer_list[0].dtype
assert all([buffer.dtype == dtype for buffer in buffer_list])

Просмотреть файл

@ -15,6 +15,7 @@ from deepspeed.runtime.swap_tensor.constants import *
from deepspeed.runtime.swap_tensor.utils import swap_in_tensors, swap_out_tensors, \
MIN_AIO_BYTES, AIO_ALIGNED_BYTES, get_sized_buffers
from deepspeed.runtime.swap_tensor.utils import SwapBufferManager, SwapBufferPool
from deepspeed.accelerator import get_accelerator
class FlattenedTensorSwapInfo(object):
@ -90,7 +91,7 @@ class OptimizerStateSwapInfo(object):
return [grad.path for grad in self.swapped_gradients.values()]
def get_unpinned_state_tensors(self):
return [t for t in self.tensors if not t.is_pinned()]
return [t for t in self.tensors if not get_accelerator().is_pinned(t)]
def read_unswapped_gradients(self, dest_buffer):
num_elem_count = 0
@ -216,7 +217,7 @@ class OptimizerSwapper(object):
fp16_pinned_buffers, fp32_parameters):
assert len(fp32_parameters) == len(fp16_partitions_info)
assert len(fp32_parameters) == len(fp16_num_elems)
assert all([buffer.is_pinned() for buffer in fp16_pinned_buffers])
assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers])
fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems)

Просмотреть файл

@ -17,6 +17,7 @@ from deepspeed.runtime.swap_tensor.utils import swap_in_tensors, swap_out_tensor
get_sized_buffers
from deepspeed.runtime.swap_tensor.async_swapper import AsyncTensorSwapper
from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper
from deepspeed.accelerator import get_accelerator
DEBUG_MODE = False
@ -174,7 +175,7 @@ class PartitionedOptimizerSwapper(OptimizerSwapper):
unpinned_paths = []
for tensor, path in zip(swap_info.tensors, swap_info.swap_paths):
if tensor.is_pinned():
if get_accelerator().is_pinned(tensor):
pinned_tensors.append(tensor)
pinned_paths.append(path)
else:
@ -206,7 +207,7 @@ class PartitionedOptimizerSwapper(OptimizerSwapper):
if not (swap_info and swap_info.has_gradients()):
return
assert dest_buffer.is_pinned()
assert get_accelerator().is_pinned(dest_buffer)
assert parameter.numel() <= dest_buffer.numel()
parameter.grad = dest_buffer.narrow(0, 0, parameter.numel())

Просмотреть файл

@ -104,10 +104,11 @@ class AsyncPartitionedParameterSwapper(object):
self.available_buffer_ids = [i for i in range(self.param_buffer_count)]
self.reserved_buffer_ids = []
self.buffers = get_accelerator().pin_memory(
torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count),
dtype=self.dtype,
requires_grad=False))
self.buffers = get_accelerator().pin_memory(torch.empty(int(self.aligned_elements_per_buffer *
self.param_buffer_count),
dtype=self.dtype,
requires_grad=False),
align_bytes=0)
self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS],
@ -313,7 +314,8 @@ class AsyncPartitionedParameterSwapper(object):
def swap_into_buffer(self, param, dest_buffer):
assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight"
require_swap_buffer = not (dest_buffer.is_pinned() and self._is_io_aligned(dest_buffer.numel()))
require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer)
and self._is_io_aligned(dest_buffer.numel()))
if require_swap_buffer:
assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}."
@ -378,8 +380,10 @@ class AsyncPartitionedParameterSwapper(object):
def reserve_partitioned_swap_space(self, partition_num_elems):
aligned_numel = sum([self._io_aligned_numel(numel) for numel in partition_num_elems])
self.partitioned_swap_buffer = get_accelerator().pin_memory(
torch.zeros(aligned_numel, device='cpu', dtype=self.dtype))
self.partitioned_swap_buffer = get_accelerator().pin_memory(torch.zeros(aligned_numel,
device='cpu',
dtype=self.dtype),
align_bytes=0)
self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer])
def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params):

Просмотреть файл

@ -96,7 +96,7 @@ class SwapBuffer(object):
class SwapBufferPool(object):
def __init__(self, buffers):
assert all([buf.is_pinned() for buf in buffers])
assert all([get_accelerator().is_pinned(buf) for buf in buffers])
self.buffers = [SwapBuffer(buf) for buf in buffers]
self.current_index = 0
@ -184,7 +184,8 @@ class SwapBufferManager(object):
self.count = count
self.dtype = dtype
self.all_buffers = [
get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype)) for _ in range(count)
get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), align_bytes=0)
for _ in range(count)
]
self.free_buffer_index = [i for i in range(count)]
self.used_buffer_index = {}

Просмотреть файл

@ -79,7 +79,7 @@ class AsyncIOBuilder(OpBuilder):
# which is a function provided by libaio that is used in the async_io op.
# If needed, one can define -I and -L entries in CFLAGS and LDFLAGS
# respectively to specify the directories for libaio.h and libaio.so.
aio_compatible = self.has_function('io_submit', ('aio', ))
aio_compatible = self.has_function('io_pgetevents', ('aio', ))
if verbose and not aio_compatible:
self.warning(f"{self.NAME} requires the dev libaio .so object and headers but these were not found.")