updating multiverso helper for the new matrix interface
This commit is contained in:
Родитель
9760990506
Коммит
7323d7c519
|
@ -479,7 +479,6 @@ int wmainWithBS(int argc, wchar_t* argv[]) // called from wmain which is a wrapp
|
||||||
bool paralleltrain = config(L"parallelTrain", false);
|
bool paralleltrain = config(L"parallelTrain", false);
|
||||||
if (paralleltrain)
|
if (paralleltrain)
|
||||||
mpi = MPIWrapper::GetInstance(true /*create*/);
|
mpi = MPIWrapper::GetInstance(true /*create*/);
|
||||||
}
|
|
||||||
|
|
||||||
g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false);
|
g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false);
|
||||||
|
|
||||||
|
|
|
@ -46,11 +46,11 @@ namespace Microsoft {
|
||||||
};
|
};
|
||||||
|
|
||||||
template<class ElemType = float>
|
template<class ElemType = float>
|
||||||
class MultiversoWrapper
|
class MultiversoHelper
|
||||||
{
|
{
|
||||||
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
|
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
|
||||||
public:
|
public:
|
||||||
MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes,
|
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
|
||||||
int MPINodeNum,
|
int MPINodeNum,
|
||||||
bool isAsyncBuffered = true,
|
bool isAsyncBuffered = true,
|
||||||
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
|
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
|
||||||
|
@ -74,7 +74,8 @@ namespace Microsoft {
|
||||||
m_cpuAsyncBuffer = new ElemType*[m_localCacheNumber];
|
m_cpuAsyncBuffer = new ElemType*[m_localCacheNumber];
|
||||||
#ifndef CPUONLY
|
#ifndef CPUONLY
|
||||||
//GPU asynchronous buffer
|
//GPU asynchronous buffer
|
||||||
m_gpuAsyncBuffer = new Matrix<ElemType>**[m_localCacheNumber];
|
//m_gpuAsyncBuffer = new Matrix<ElemType>**[m_localCacheNumber];
|
||||||
|
m_gpuAsyncBuffer.resize(m_localCacheNumber);
|
||||||
|
|
||||||
//creat an communication stream for the data tranfer between GPU and CPU
|
//creat an communication stream for the data tranfer between GPU and CPU
|
||||||
CudaErrorCheck(cudaStreamCreate(&_commStream));
|
CudaErrorCheck(cudaStreamCreate(&_commStream));
|
||||||
|
@ -91,9 +92,9 @@ namespace Microsoft {
|
||||||
MultiversoInit(learnableNodes);
|
MultiversoInit(learnableNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
~MultiversoWrapper()
|
~MultiversoHelper()
|
||||||
{
|
{
|
||||||
fprintf(stderr, "~MultiversoWrapper\n");
|
fprintf(stderr, "~MultiversoHelper\n");
|
||||||
fflush(stderr);
|
fflush(stderr);
|
||||||
|
|
||||||
if (m_isUseAsyncBuffered && m_prefetchThread != nullptr && m_prefetchThread->joinable())
|
if (m_isUseAsyncBuffered && m_prefetchThread != nullptr && m_prefetchThread->joinable())
|
||||||
|
@ -126,11 +127,17 @@ namespace Microsoft {
|
||||||
{
|
{
|
||||||
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
|
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
|
||||||
Matrix<ElemType> &mat = node->Value();
|
Matrix<ElemType> &mat = node->Value();
|
||||||
|
printf("here!2\n");
|
||||||
|
fflush(stdout);
|
||||||
|
#pragma warning( push )
|
||||||
|
#pragma warning( disable : 4238)
|
||||||
|
|
||||||
#ifndef CPUONLY
|
#ifndef CPUONLY
|
||||||
for (int j = 0; j < m_localCacheNumber; j++)
|
for (int j = 0; j < m_localCacheNumber; j++)
|
||||||
m_gpuAsyncBuffer[j][i] = new Matrix<ElemType>(mat);
|
m_gpuAsyncBuffer[j].push_back(mat.DeepClone());
|
||||||
|
//m_gpuAsyncBuffer[j][i] = mat.DeepClone();
|
||||||
#endif
|
#endif
|
||||||
|
#pragma warning( pop )
|
||||||
ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
|
ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
|
||||||
mat.CopyToArray(px, m_tableLength[i]);
|
mat.CopyToArray(px, m_tableLength[i]);
|
||||||
}
|
}
|
||||||
|
@ -178,14 +185,14 @@ namespace Microsoft {
|
||||||
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
|
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
|
||||||
#ifndef CPUONLY
|
#ifndef CPUONLY
|
||||||
//CNTK model -> GPU buffer
|
//CNTK model -> GPU buffer
|
||||||
CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferInUse][i]->BufferPointer(),
|
CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferInUse][i].Data(),
|
||||||
mat.BufferPointer(),
|
mat.Data(),
|
||||||
mat.GetNumElements() * sizeof(ElemType),
|
mat.GetNumElements() * sizeof(ElemType),
|
||||||
cudaMemcpyDeviceToDevice));
|
cudaMemcpyDeviceToDevice));
|
||||||
|
|
||||||
//GPU buffer -> CNTK model
|
//GPU buffer -> CNTK model
|
||||||
CudaErrorCheck(cudaMemcpy(mat.BufferPointer(),
|
CudaErrorCheck(cudaMemcpy(mat.Data(),
|
||||||
m_gpuAsyncBuffer[m_cacheSwapIndex[m_bufferInUse]][i]->BufferPointer(),
|
m_gpuAsyncBuffer[m_cacheSwapIndex[m_bufferInUse]][i].Data(),
|
||||||
mat.GetNumElements() * sizeof(ElemType),
|
mat.GetNumElements() * sizeof(ElemType),
|
||||||
cudaMemcpyDeviceToDevice));
|
cudaMemcpyDeviceToDevice));
|
||||||
#else
|
#else
|
||||||
|
@ -205,7 +212,7 @@ namespace Microsoft {
|
||||||
m_prefetchThread = new thread([&](){
|
m_prefetchThread = new thread([&](){
|
||||||
float factor = DecayCoefficient();
|
float factor = DecayCoefficient();
|
||||||
int t_cacheIdx = m_bufferInUse;
|
int t_cacheIdx = m_bufferInUse;
|
||||||
int deviceId = m_gpuAsyncBuffer[t_cacheIdx][0]->GetDeviceId();
|
int deviceId = m_gpuAsyncBuffer[t_cacheIdx][0].GetDeviceId();
|
||||||
|
|
||||||
CudaErrorCheck(cudaSetDevice(deviceId));
|
CudaErrorCheck(cudaSetDevice(deviceId));
|
||||||
|
|
||||||
|
@ -214,8 +221,8 @@ namespace Microsoft {
|
||||||
ElemType * px = m_deltaArray + m_tableOffsets[widx];
|
ElemType * px = m_deltaArray + m_tableOffsets[widx];
|
||||||
//GPU buffer -> CPU buffer
|
//GPU buffer -> CPU buffer
|
||||||
CudaErrorCheck(cudaMemcpyAsync(px,
|
CudaErrorCheck(cudaMemcpyAsync(px,
|
||||||
m_gpuAsyncBuffer[t_cacheIdx][widx]->BufferPointer(),
|
m_gpuAsyncBuffer[t_cacheIdx][widx].Data(),
|
||||||
m_gpuAsyncBuffer[t_cacheIdx][widx]->GetNumElements() * sizeof(ElemType),
|
m_gpuAsyncBuffer[t_cacheIdx][widx].GetNumElements() * sizeof(ElemType),
|
||||||
cudaMemcpyDeviceToHost,
|
cudaMemcpyDeviceToHost,
|
||||||
_commStream));
|
_commStream));
|
||||||
}
|
}
|
||||||
|
@ -242,9 +249,9 @@ namespace Microsoft {
|
||||||
{
|
{
|
||||||
ElemType * py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx];
|
ElemType * py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx];
|
||||||
|
|
||||||
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[t_cacheIdx][widx]->BufferPointer(),
|
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[t_cacheIdx][widx].Data(),
|
||||||
py,
|
py,
|
||||||
m_gpuAsyncBuffer[t_cacheIdx][widx]->GetNumElements() * sizeof(ElemType),
|
m_gpuAsyncBuffer[t_cacheIdx][widx].GetNumElements() * sizeof(ElemType),
|
||||||
cudaMemcpyHostToDevice,
|
cudaMemcpyHostToDevice,
|
||||||
_commStream));
|
_commStream));
|
||||||
}
|
}
|
||||||
|
@ -376,8 +383,13 @@ namespace Microsoft {
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef CPUONLY
|
#ifndef CPUONLY
|
||||||
|
printf("here!1\n");
|
||||||
|
fflush(stdout);
|
||||||
for (int i = 0; i < m_localCacheNumber; i++)
|
for (int i = 0; i < m_localCacheNumber; i++)
|
||||||
m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount];
|
//m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount];
|
||||||
|
m_gpuAsyncBuffer[i].reserve(m_tableCount);
|
||||||
|
printf("here!2\n");
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
//create pinned memory
|
//create pinned memory
|
||||||
for (int i = 0; i < m_localCacheNumber; ++i)
|
for (int i = 0; i < m_localCacheNumber; ++i)
|
||||||
|
@ -433,7 +445,8 @@ namespace Microsoft {
|
||||||
ElemType ** m_cpuAsyncBuffer;
|
ElemType ** m_cpuAsyncBuffer;
|
||||||
|
|
||||||
//GPU double buffer
|
//GPU double buffer
|
||||||
Matrix<ElemType> *** m_gpuAsyncBuffer;
|
//Matrix<ElemType> ** m_gpuAsyncBuffer;
|
||||||
|
std::vector<std::vector<Matrix<ElemType> >> m_gpuAsyncBuffer;
|
||||||
int m_tableCount;
|
int m_tableCount;
|
||||||
#ifndef CPUONLY
|
#ifndef CPUONLY
|
||||||
cudaStream_t _commStream;
|
cudaStream_t _commStream;
|
||||||
|
|
|
@ -12,10 +12,10 @@ namespace Microsoft {
|
||||||
};
|
};
|
||||||
|
|
||||||
template<class ElemType = float>
|
template<class ElemType = float>
|
||||||
class MultiversoWrapper
|
class MultiversoHelper
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes,
|
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
|
||||||
int localWorkerNumber,
|
int localWorkerNumber,
|
||||||
bool isPipeline = true,
|
bool isPipeline = true,
|
||||||
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
|
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
|
||||||
|
@ -25,7 +25,7 @@ namespace Microsoft {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
~MultiversoWrapper()
|
~MultiversoHelper()
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -348,16 +348,16 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
|
||||||
//Multiverso Warpper for ASGD logic init
|
//Multiverso Warpper for ASGD logic init
|
||||||
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
|
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
|
||||||
{
|
{
|
||||||
m_multiverso = new MultiversoWrapper<ElemType>(learnableNodes,
|
m_pMultiversoHelper = new MultiversoHelper<ElemType>(learnableNodes,
|
||||||
g_mpi->NumNodesInUse(),
|
m_mpi->NumNodesInUse(),
|
||||||
m_isPipeline,
|
m_isPipeline,
|
||||||
m_adjustlearningrateatbeginning,
|
m_adjustlearningrateatbeginning,
|
||||||
m_adjustcoefficient,
|
m_adjustcoefficient,
|
||||||
m_adjustnbminibatch,
|
m_adjustnbminibatch,
|
||||||
m_traceLevel);
|
m_traceLevel);
|
||||||
m_multiverso->InitModel(learnableNodes);
|
m_pMultiversoHelper->InitModel(learnableNodes);
|
||||||
m_multiversoBarrier = false;
|
m_pMultiversoHelperBarrier = false;
|
||||||
m_multiverso->WaitAll();
|
m_pMultiversoHelper->WaitAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- MAIN EPOCH LOOP
|
// --- MAIN EPOCH LOOP
|
||||||
|
@ -716,7 +716,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
|
||||||
// Synchronize all ranks before proceeding to ensure that
|
// Synchronize all ranks before proceeding to ensure that
|
||||||
// rank 0 has finished writing the model file
|
// rank 0 has finished writing the model file
|
||||||
// TODO[DataASGD]: should othet other rank waiting in async-mode
|
// TODO[DataASGD]: should othet other rank waiting in async-mode
|
||||||
if (m_mpi != nullptr && GetParallazationMethod() != ParallelizationMethod::DataParallelASGD)
|
if (m_mpi != nullptr && GetParallelizationMethod() != ParallelizationMethod::DataParallelASGD)
|
||||||
{
|
{
|
||||||
m_mpi->WaitAll();
|
m_mpi->WaitAll();
|
||||||
}
|
}
|
||||||
|
@ -738,7 +738,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
|
||||||
delete inputMatrices;
|
delete inputMatrices;
|
||||||
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
|
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
|
||||||
{
|
{
|
||||||
delete m_multiverso;
|
delete m_pMultiversoHelper;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -795,7 +795,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
|
||||||
(epochNumber >= m_parallelizationStartEpochNum));
|
(epochNumber >= m_parallelizationStartEpochNum));
|
||||||
bool useModelAveraging = ((GetParallelizationMethod() == ParallelizationMethod::ModelAveragingSGD) &&
|
bool useModelAveraging = ((GetParallelizationMethod() == ParallelizationMethod::ModelAveragingSGD) &&
|
||||||
(epochNumber >= m_parallelizationStartEpochNum));
|
(epochNumber >= m_parallelizationStartEpochNum));
|
||||||
bool useASGD = ((m_parallelizationMethod == ParallelizationMethod::DataParallelASGD) &&
|
bool useASGD = ((GetParallelizationMethod() == ParallelizationMethod::DataParallelASGD) &&
|
||||||
(epochNumber >= m_parallelizationStartEpochNum));
|
(epochNumber >= m_parallelizationStartEpochNum));
|
||||||
bool useParallelTrain = useGradientAggregation || useModelAveraging || useASGD;
|
bool useParallelTrain = useGradientAggregation || useModelAveraging || useASGD;
|
||||||
|
|
||||||
|
@ -1101,12 +1101,14 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useASGD && g_mpi->NumNodesInUse() > 1)
|
// using parameter server for parameter update
|
||||||
|
if (useASGD && m_mpi->NumNodesInUse() > 1)
|
||||||
{
|
{
|
||||||
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD && m_nEpochBarrier[epochNumber] > 0 && epochNumber % m_nEpochBarrier[epochNumber] == 0)
|
if (GetParallelizationMethod() == ParallelizationMethod::DataParallelASGD && m_nEpochBarrier[epochNumber] > 0 && epochNumber % m_nEpochBarrier[epochNumber] == 0)
|
||||||
{
|
{
|
||||||
m_multiverso->WaitAsyncBuffer();
|
// simulating BSP
|
||||||
m_multiverso->WaitAll();
|
m_pMultiversoHelper->WaitAsyncBuffer();
|
||||||
|
m_pMultiversoHelper->WaitAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine if any samples were processed across any of the ranks
|
// Determine if any samples were processed across any of the ranks
|
||||||
|
@ -1115,14 +1117,11 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
|
||||||
noMoreSamplesToProcess = !wasDataRead;
|
noMoreSamplesToProcess = !wasDataRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t processedSamples = 0;
|
|
||||||
if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber])
|
if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber])
|
||||||
{
|
{
|
||||||
m_multiverso->PushAndPullModel(learnableNodes);
|
m_pMultiversoHelper->PushAndPullModel(learnableNodes);
|
||||||
processedSamples = nSamplesSinceLastModelSync;
|
|
||||||
nSamplesSinceLastModelSync = 0;
|
nSamplesSinceLastModelSync = 0;
|
||||||
}
|
}
|
||||||
aggregateNumSamplesWithLabel = processedSamples;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
commTimer.Stop();
|
commTimer.Stop();
|
||||||
|
@ -1236,7 +1235,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
|
||||||
|
|
||||||
timer.Restart();
|
timer.Restart();
|
||||||
totalEpochSamples += aggregateNumSamplesWithLabel;
|
totalEpochSamples += aggregateNumSamplesWithLabel;
|
||||||
if (!useModelAveraging && !useDataASGD)
|
if (!useModelAveraging && !useASGD)
|
||||||
totalSamplesSeen += aggregateNumSamplesWithLabel;
|
totalSamplesSeen += aggregateNumSamplesWithLabel;
|
||||||
readTimer.Restart();
|
readTimer.Restart();
|
||||||
|
|
||||||
|
@ -1263,13 +1262,13 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
|
||||||
nSamplesSinceLastModelSync = 0;
|
nSamplesSinceLastModelSync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useASGD && (g_mpi->NumNodesInUse() > 1))
|
if (useASGD && (m_mpi->NumNodesInUse() > 1))
|
||||||
{
|
{
|
||||||
// ASGD also may not be synced after epoch finished, so do the sync here
|
// ASGD also shouldn't syncing after every epoch
|
||||||
int residualSampels = (int)nSamplesSinceLastModelSync;
|
int residualSampels = (int)nSamplesSinceLastModelSync;
|
||||||
totalSamplesSeen += residualSampels;
|
totalSamplesSeen += residualSampels;
|
||||||
totalEpochSamples += residualSampels;
|
totalEpochSamples += residualSampels;
|
||||||
m_multiverso->PushAndPullModel(learnableNodes);
|
m_pMultiversoHelper->PushAndPullModel(learnableNodes);
|
||||||
nSamplesSinceLastModelSync = 0;
|
nSamplesSinceLastModelSync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -555,8 +555,8 @@ protected:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int SGDTrace(FILE* __restrict __stream, bool isPrependTimestamp, const char* __restrict __format, ...);
|
int SGDTrace(FILE* __restrict __stream, bool isPrependTimestamp, const char* __restrict __format, ...);
|
||||||
MultiversoWrapper<ElemType>* m_multiverso;
|
MultiversoHelper<ElemType>* m_pMultiversoHelper;
|
||||||
bool m_multiversoBarrier;
|
bool m_pMultiversoHelperBarrier;
|
||||||
};
|
};
|
||||||
|
|
||||||
}}}
|
}}}
|
||||||
|
|
|
@ -67,7 +67,7 @@
|
||||||
</ItemDefinitionGroup>
|
</ItemDefinitionGroup>
|
||||||
<ItemDefinitionGroup Condition="'$(CNTK_ENABLE_ASGD)'=='true'">
|
<ItemDefinitionGroup Condition="'$(CNTK_ENABLE_ASGD)'=='true'">
|
||||||
<ClCompile>
|
<ClCompile>
|
||||||
<AdditionalIncludeDirectories>$(SolutionDir)Source\multiverso;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
|
<AdditionalIncludeDirectories>$(SolutionDir)Source\multiverso\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
|
||||||
</ClCompile>
|
</ClCompile>
|
||||||
</ItemDefinitionGroup>
|
</ItemDefinitionGroup>
|
||||||
<ItemDefinitionGroup Condition="$(DebugBuild)">
|
<ItemDefinitionGroup Condition="$(DebugBuild)">
|
||||||
|
|
Загрузка…
Ссылка в новой задаче