Do gradients allreduce aggregation with GPUDirect RDMA

1. Iallreduce with cuda-aware not supported
ref: https://www.open-mpi.org/faq/?category=runcuda#mpi-apis-no-cuda
This commit is contained in:
Junjie Qian 2017-02-25 20:35:08 -08:00
Родитель 621052af8f
Коммит bc04458bda
7 изменённых файлов: 173 добавлений и 70 удалений

Просмотреть файл

@ -181,6 +181,10 @@ endif
COMMON_FLAGS += -DUSE_MKL
endif
ifeq ($(CUDA_GDR),1)
COMMON_FLAGS += -DUSE_CUDA_GDR
endif
ifeq ("$(MATHLIB)","openblas")
INCLUDEPATH += $(OPENBLAS_PATH)/include
LIBPATH += $(OPENBLAS_PATH)/lib

Просмотреть файл

@ -7,7 +7,6 @@
#include <functional>
#include "Basics.h"
#include "Constants.h"
#include "MPIWrapper.h"
#include "CNTKLibrary.h"
#include "DistributedCommunicator.h"
#include "CUDAPageLockedMemAllocator.h"
@ -328,14 +327,14 @@ namespace CNTK
}
// Do the packing to reduce the number of MPI requests.
// Donot re-allocating the continous buffer is existing buffer size equals to required one.
m_aggregationBufferFloat = setContinousBuffer<float>(packedFloatGradientsIndex, packedFloatGradientsSizeInBytes, inputValues, outputValues,
// Do not re-allocating the continous buffer if existing buffer size equals to required one.
m_aggregationBufferFloat = SetContinuousBuffer<float>(packedFloatGradientsIndex, packedFloatGradientsSizeInBytes, inputValues, outputValues,
valuesToAggregate, valuesAfterAggregate);
m_aggregationBufferDouble = setContinousBuffer<double>(packedDoubleGradientsIndex, packedDoubleGradientsSizeInBytes, inputValues, outputValues,
m_aggregationBufferDouble = SetContinuousBuffer<double>(packedDoubleGradientsIndex, packedDoubleGradientsSizeInBytes, inputValues, outputValues,
valuesToAggregate, valuesAfterAggregate);
packToContinousBuffer(m_aggregationBufferFloat.get(), packedFloatGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate);
packToContinousBuffer(m_aggregationBufferDouble.get(), packedDoubleGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate);
PackToContinuousBuffer(m_aggregationBufferFloat.get(), packedFloatGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate);
PackToContinuousBuffer(m_aggregationBufferDouble.get(), packedDoubleGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate);
numValues = valuesToAggregate.size();
@ -360,24 +359,15 @@ namespace CNTK
m_nccl.reset(new NcclComm(AsCNTKImplDeviceId(inputValues[0]->Device()), m_mpi));
}
// for all values residing on GPU initiate async transfer to CPU buffers.
for (auto i = 0; i < numValues; ++i)
{
auto view = valuesToAggregate[i];
if (!m_nccl->IsSupported() && view->Device() != DeviceDescriptor::CPUDevice())
{
auto& transferer = m_gpuDataTransferers[i];
auto& buffer = m_intermediateCPUBuffers[i];
transferer->CopyGPUToCPUAsync(GetDataBuffer(view), GetBufferSize(view), buffer.data.get());
}
}
// For all values residing on GPU initiate async transfer to CPU buffers if needed
CopyDataFromGPUToCPU(valuesToAggregate);
std::vector<MPI_Request> allReduceRequests;
for (auto i = 0; i < numValues; ++i)
{
auto inputValue = valuesToAggregate[i];
if (!m_nccl->IsSupported() && inputValue->Device() != DeviceDescriptor::CPUDevice())
if (ShouldCopyDataToCPU(inputValue))
{
// TODO: actually, we can start reducing all cpu values first, and then wait for the gpu->cpu transfer to finish.
m_gpuDataTransferers[i]->WaitForCopyGPUToCPUAsync();
@ -392,39 +382,22 @@ namespace CNTK
assert(dataType == outputValue->GetDataType());
assert(inputValue->Device() == outputValue->Device());
if (!m_nccl->IsSupported() || inputValue->Device() == DeviceDescriptor::CPUDevice())
{
void* inputData = (inputValue->Device() != DeviceDescriptor::CPUDevice()) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(inputValue);
void* outputData = (inputValue->Device() != DeviceDescriptor::CPUDevice()) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(outputValue);
void* inputData = (ShouldCopyDataToCPU(inputValue)) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(inputValue);
void* outputData = (ShouldCopyDataToCPU(inputValue)) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(outputValue);
allReduceRequests.push_back(MPI_Request());
if (dataType == DataType::Float)
{
if (inputData == outputData)
m_mpi->AllReduceAsync(static_cast<float*>(outputData), numElements, &allReduceRequests.back());
else
m_mpi->AllReduceAsync(static_cast<float*>(inputData), static_cast<float*>(outputData), numElements, &allReduceRequests.back());
AllReduceGradients(static_cast<float*>(inputData), static_cast<float*>(outputData), numElements,
allReduceRequests, (inputValue->Device() == DeviceDescriptor::CPUDevice()));
}
else if (dataType == DataType::Double)
{
if (inputData == outputData)
m_mpi->AllReduceAsync(static_cast<double*>(outputData), numElements, &allReduceRequests.back());
else
m_mpi->AllReduceAsync(static_cast<double*>(inputData), static_cast<double*>(outputData), numElements, &allReduceRequests.back());
AllReduceGradients(static_cast<double*>(inputData), static_cast<double*>(outputData), numElements,
allReduceRequests, (inputValue->Device() == DeviceDescriptor::CPUDevice()));
}
else
LogicError("MPICommunicator: Unknown DataType.");
}
else
{
if (dataType == DataType::Float)
m_nccl->AllReduce(static_cast<float*>(GetDataBuffer(inputValue)), static_cast<float*>(GetDataBuffer(outputValue)), numElements);
else if (dataType == DataType::Double)
m_nccl->AllReduce(static_cast<double*>(GetDataBuffer(inputValue)), static_cast<double*>(GetDataBuffer(outputValue)), numElements);
else
LogicError("DistributedCommunicator: Unknown DataType.");
}
}
if (m_nccl->IsSupported())
{
@ -448,7 +421,7 @@ namespace CNTK
assert(idx < valuesToAggregate.size());
auto value = valuesToAggregate[idx];
if (value->Device() != DeviceDescriptor::CPUDevice())
if (ShouldCopyDataToCPU(value))
{
auto view = valuesAfterAggregate[idx];
auto size = GetBufferSize(view);
@ -457,18 +430,16 @@ namespace CNTK
transferer->CopyCPUToGPUAsync(buffer.data.get(), size, GetDataBuffer(view));
}
}
// TODO: Should not wait, simply publishing event on the compute stream should be sufficient.
// TODO: Should not wait, simply publishing event on the compute stream should be sufficient
for (auto i = 0; i < numValues; ++i)
{
if (valuesToAggregate[i]->Device() != DeviceDescriptor::CPUDevice())
if (ShouldCopyDataToCPU(valuesToAggregate[i]))
m_gpuDataTransferers[i]->WaitForCopyCPUToGPUAsync();
}
// Unpack the continuous buffer
unpackFromContinousBuffer(m_aggregationBufferFloat.get(), outputValues, packedFloatGradientsIndex);
unpackFromContinousBuffer(m_aggregationBufferDouble.get(), outputValues, packedDoubleGradientsIndex);
UnpackFromContinuousBuffer(m_aggregationBufferFloat.get(), outputValues, packedFloatGradientsIndex);
UnpackFromContinuousBuffer(m_aggregationBufferDouble.get(), outputValues, packedDoubleGradientsIndex);
}
void MPICommunicatorImpl::Barrier()
@ -476,8 +447,34 @@ namespace CNTK
m_mpi->WaitAll();
}
bool MPICommunicatorImpl::ShouldCopyDataToCPU(NDArrayViewPtr inputValue)
{
if (inputValue->Device() == DeviceDescriptor::CPUDevice())
return false;
// Donot copy if NCCL is supported or GPUDirect RDMA is used
if (m_nccl->IsSupported() || m_mpi->UseGpuGdr())
return false;
return true;
}
void MPICommunicatorImpl::CopyDataFromGPUToCPU(std::vector<NDArrayViewPtr>& inputValues)
{
for (auto i = 0; i < inputValues.size(); ++i)
{
auto view = inputValues[i];
if (ShouldCopyDataToCPU(inputValues[i]))
{
auto& transferer = m_gpuDataTransferers[i];
auto& buffer = m_intermediateCPUBuffers[i];
transferer->CopyGPUToCPUAsync(GetDataBuffer(view), GetBufferSize(view), buffer.data.get());
}
}
}
template <typename ElemType>
std::unique_ptr<Matrix<ElemType>> MPICommunicatorImpl::setContinousBuffer(std::vector<size_t>& packedGradientsIndex, size_t packedGradientsSizeInBytes,
std::unique_ptr<Matrix<ElemType>> MPICommunicatorImpl::SetContinuousBuffer(std::vector<size_t>& packedGradientsIndex, size_t packedGradientsSizeInBytes,
const std::vector<NDArrayViewPtr>& inputValues, const std::vector<NDArrayViewPtr>& outputValues,
std::vector<NDArrayViewPtr>& valuesToAggregate, std::vector<NDArrayViewPtr>& valuesAfterAggregate)
{
@ -496,7 +493,7 @@ namespace CNTK
}
template <typename ElemType>
void MPICommunicatorImpl::packToContinousBuffer(Matrix<ElemType>* aggregationBuffer, std::vector<size_t>& packedGradientsIndex,
void MPICommunicatorImpl::PackToContinuousBuffer(Matrix<ElemType>* aggregationBuffer, std::vector<size_t>& packedGradientsIndex,
const std::vector<NDArrayViewPtr>& inputValues, const std::vector<NDArrayViewPtr>& outputValues, std::vector<NDArrayViewPtr>& valuesToAggregate, std::vector<NDArrayViewPtr>& valuesAfterAggregate)
{
if (packedGradientsIndex.size() < 1)
@ -530,7 +527,7 @@ namespace CNTK
}
template <typename ElemType>
void MPICommunicatorImpl::unpackFromContinousBuffer(Matrix<ElemType>* aggregationBuffer, const std::vector<NDArrayViewPtr>& outputValues,
void MPICommunicatorImpl::UnpackFromContinuousBuffer(Matrix<ElemType>* aggregationBuffer, const std::vector<NDArrayViewPtr>& outputValues,
std::vector<size_t>& packedGradientsIndex)
{
if (packedGradientsIndex.size() != 0)
@ -544,4 +541,31 @@ namespace CNTK
}
}
}
template <typename ElemType>
void MPICommunicatorImpl::AllReduceGradients(ElemType* inputData, ElemType* outputData, size_t numElements, std::vector<MPI_Request> &allReduceRequests, bool dataOnCPU)
{
if (m_nccl->IsSupported() && !dataOnCPU)
{
m_nccl->AllReduce(inputData, outputData, numElements);
return;
}
if (m_mpi->UseGpuGdr())
{
if (inputData == outputData)
m_mpi->AllReduce(outputData, numElements);
else
m_mpi->AllReduce(inputData, outputData, numElements);
return;
}
allReduceRequests.push_back(MPI_Request());
if (inputData == outputData)
m_mpi->AllReduceAsync(outputData, numElements, &allReduceRequests.back());
else
m_mpi->AllReduceAsync(inputData, outputData, numElements, &allReduceRequests.back());
}
}

Просмотреть файл

@ -8,6 +8,7 @@
#include "CNTKLibrary.h"
#include "Constants.h"
#include "NcclComm.h"
#include "MPIWrapper.h"
#include <MatrixQuantizerImpl.h>
namespace Microsoft { namespace MSR { namespace CNTK {
@ -120,16 +121,22 @@ namespace CNTK
Microsoft::MSR::CNTK::MPIWrapperPtr m_mpi;
bool ShouldCopyDataToCPU(NDArrayViewPtr inputValue);
void CopyDataFromGPUToCPU(std::vector<NDArrayViewPtr>& inputValues);
template <typename ElemType>
std::unique_ptr<Microsoft::MSR::CNTK::Matrix<ElemType>> setContinousBuffer(std::vector<size_t>& packedGradientsIndex, size_t packedGradientsSizeInBytes,
std::unique_ptr<Microsoft::MSR::CNTK::Matrix<ElemType>> SetContinuousBuffer(std::vector<size_t>& packedGradientsIndex, size_t packedGradientsSizeInBytes,
const std::vector<NDArrayViewPtr>& inputValues, const std::vector<NDArrayViewPtr>& outputValues,
std::vector<NDArrayViewPtr>& valuesToAggregate, std::vector<NDArrayViewPtr>& valuesAfterAggregate);
template <typename ElemType>
void packToContinousBuffer(Microsoft::MSR::CNTK::Matrix<ElemType>* aggregationBuffer, std::vector<size_t>& packedGradientsIndex,
void PackToContinuousBuffer(Microsoft::MSR::CNTK::Matrix<ElemType>* aggregationBuffer, std::vector<size_t>& packedGradientsIndex,
const std::vector<NDArrayViewPtr>& inputValues, const std::vector<NDArrayViewPtr>& outputValues, std::vector<NDArrayViewPtr>& valuesToAggregate, std::vector<NDArrayViewPtr>& valuesAfterAggregate);
template <typename ElemType>
void unpackFromContinousBuffer(Microsoft::MSR::CNTK::Matrix<ElemType>* aggregationBuffer, const std::vector<NDArrayViewPtr>& outputValues, std::vector<size_t>& packedGradientsIndex);
void UnpackFromContinuousBuffer(Microsoft::MSR::CNTK::Matrix<ElemType>* aggregationBuffer, const std::vector<NDArrayViewPtr>& outputValues, std::vector<size_t>& packedGradientsIndex);
template <typename ElemType>
void AllReduceGradients(ElemType* inputData, ElemType* outputData, size_t numElements, std::vector<MPI_Request> &allReduceRequests, bool dataOnCPU);
};
}

Просмотреть файл

@ -96,6 +96,9 @@ public:
virtual size_t MainNodeRank() const = 0;
virtual bool IsMultiHost() const = 0;
// Use GPUDirect RDMA support
virtual bool UseGpuGdr() = 0;
// -----------------------------------------------------------------------
// data-exchange functions (wrappers around MPI functions)
// -----------------------------------------------------------------------
@ -111,7 +114,6 @@ public:
virtual int Abort(int errorcode) = 0;
virtual int Error_string(int errorcode, char* string, int* resultlen) = 0;
// helpers to determine the MPI_Datatype of a pointer
static MPI_Datatype GetDataType(char *);
static MPI_Datatype GetDataType(int *);

Просмотреть файл

@ -71,6 +71,9 @@ public:
size_t MainNodeRank() const;
bool IsMultiHost() const;
// Use GPUDirect RDMA support
virtual bool UseGpuGdr() override;
// -----------------------------------------------------------------------
// data-exchange functions (wrappers around MPI functions)
// -----------------------------------------------------------------------
@ -165,6 +168,8 @@ public:
bool UsingAllNodes() const;
size_t MainNodeRank() const;
bool IsMultiHost() const;
// Use GPUDirect RDMA
virtual bool UseGpuGdr() override;
// -----------------------------------------------------------------------
// data-exchange functions (wrappers around MPI functions)
@ -681,6 +686,16 @@ int MPIWrapperMpi::Error_string(int errorcode, char* str, int* resultlen)
return MPI_Error_string(errorcode, str, resultlen);
}
bool MPIWrapperMpi::UseGpuGdr()
{
// Only support GPUDirect RDMA on Unix and built with GDR
#if defined(USE_CUDA_GDR) && defined(__unix__)
return true;
#else
return false;
#endif
}
size_t MPIWrapperMpi::NumNodesInUse() const
{
return m_numNodesInUse;
@ -987,6 +1002,11 @@ bool MPIWrapperEmpty::IsMultiHost() const
return false;
}
bool MPIWrapperEmpty::UseGpuGdr()
{
return false;
}
int MPIWrapperEmpty::Finalize(void)
{
return MPI_UNDEFINED;

Просмотреть файл

@ -138,6 +138,19 @@ private:
});
}
bool ShouldCopyDataToCPU(int deviceId)
{
// Do not copy if data is on CPU
if (deviceId == CPUDEVICE)
return false;
// Do not copy if NCCL is supported or GPUDirect RDMA is used
if (m_nccl.IsSupported() || m_mpi->UseGpuGdr() == true)
return false;
return true;
}
void ResetState(const std::vector<Matrix<ElemType>*>& gradients, int numEvalNodes, bool resetState)
{
// When called the first time let's setup the intermediateCPU buffers for gradient aggregation if needed
@ -146,8 +159,11 @@ private:
m_initialized = true;
int deviceId = gradients[0]->GetDeviceId();
if (!m_nccl.IsSupported() && (deviceId != CPUDEVICE))
// Initial preparation for data copy from GPU to CPU
if (ShouldCopyDataToCPU(deviceId))
{
m_allocator.reset(new CUDAPageLockedMemAllocator(deviceId));
}
size_t packedGradientsSizeInElements = 0;
for (size_t i = 0; i < gradients.size(); i++)
@ -194,8 +210,7 @@ private:
m_gradientIndexToAggregate.insert(m_gradientIndexToAggregate.begin(), 1, (size_t)-1);
}
// If running on GPU and NCCL not supported, initialize GPU and CPU data transfer
if (!m_nccl.IsSupported() && (deviceId != CPUDEVICE))
if (ShouldCopyDataToCPU(deviceId))
{
for (size_t i : m_gradientIndexToAggregate)
{
@ -274,7 +289,7 @@ private:
}
// Initiate transfer of the bufferred data to the CPU if needed
if (!m_nccl.IsSupported() && deviceId != CPUDEVICE)
if (ShouldCopyDataToCPU(deviceId))
{
size_t gpuDataTransfersIdx = 0;
Matrix<ElemType>* gpuCopyBuffer = m_aggregationBuffer.get();
@ -321,16 +336,24 @@ private:
{
allReduceRequests.push_back(MPI_Request());
reductionBuffer = (i == -1)? m_aggregationBuffer->Data() : gradients[i]->Data();
if (deviceId != CPUDEVICE)
if (m_mpi->UseGpuGdr() == 0 && deviceId != CPUDEVICE)
{
m_gpuDataTransferers[allReduceIndex]->WaitForCopyGPUToCPUAsync();
reductionBuffer = m_intermediateCPUBuffers[allReduceIndex].get();
}
if (m_mpi->UseGpuGdr() == 0)
{
m_mpi->Iallreduce(MPI_IN_PLACE, reductionBuffer, (i == -1) ? m_aggregationBuffer->GetNumElements() : gradients[i]->GetNumElements(),
MPIWrapper::GetDataType(reductionBuffer), MPI_SUM, &allReduceRequests.back()) || MpiFail("MPI_Iallreduce");
allReduceIndex++;
}
// TODO: Remove this when MPI_Iallreduce with CUDA - aware is supported
else
{
m_mpi->AllReduce(reductionBuffer, (i == -1) ? m_aggregationBuffer->GetNumElements() : gradients[i]->GetNumElements());
}
}
}
else
{
@ -370,7 +393,8 @@ private:
{
m_nccl.Sync();
}
else
// TODO: Remove this when MPI_Iallreduce with CUDA-aware is supported
else if (m_mpi->UseGpuGdr() == 0)
{
// Wait for the allreduce operations to finish and initiate transfer back to the GPU if needed
size_t gpuDataTransfersIdx = 0; // Index of allReduceRequest for each un-packed gradient

22
configure поставляемый
Просмотреть файл

@ -45,6 +45,11 @@ protobuf_check=lib/libprotobuf.a
mpi_path=
mpi_check=include/mpi.h
# Cuda-aware MPI
# OPENMPI can auto-detect but not MVAPICH2
cuda_gdr=no
default_cuda_gdr=$cuda_gdr
have_kaldi=no
kaldi_path=
kaldi_check=src/kaldi.mk
@ -341,6 +346,7 @@ function show_help ()
echo " --cuda[=(yes|no)] use cuda GPU $(show_default $(default_use_cuda))"
echo " --python[=(yes|no)] with Python bindings $(show_default $(default_use_python))"
echo " --mpi[=(yes|no)] use MPI communication $(show_default ${default_use_mpi})"
echo " --gdr[=(yes|no)] use GPUDirect RDMA $(show_default ${default_cuda_gdr})"
echo " --with-cuda[=directory] $(show_default $(find_cuda))"
echo " --with-cub[=directory] $(show_default $(find_cub))"
echo " --with-gdk-include[=directory] $(show_default $(find_gdk_include))"
@ -534,6 +540,17 @@ do
fi
;;
--gdr*)
if test x$optarg = xyes || test x$optarg = xno
then
cuda_gdr=$optarg
else
echo "Invalid value for --gdr $optarg"
show_help
exit
fi
;;
--with-cuda*)
enable_cuda=yes
if test x$optarg = x
@ -1083,6 +1100,11 @@ if test $have_mpi = yes ; then
else
echo HAS_MPI=0 >> $config
fi
if test $cuda_gdr = yes ; then
echo CUDA_GDR=1 >> $config
else
echo CUDA_GDR=0 >> $config
fi
if test $enable_cuda = yes ; then
echo CUDA_PATH=$cuda_path >> $config
echo GDK_INCLUDE_PATH=$gdk_include_path >> $config