updating multiverso helper for the new matrix interface

This commit is contained in:
Qiwei Ye 2016-04-16 19:10:11 +08:00
Родитель 9760990506
Коммит 7323d7c519
6 изменённых файлов: 161 добавлений и 150 удалений

Просмотреть файл

@ -479,7 +479,6 @@ int wmainWithBS(int argc, wchar_t* argv[]) // called from wmain which is a wrapp
bool paralleltrain = config(L"parallelTrain", false); bool paralleltrain = config(L"parallelTrain", false);
if (paralleltrain) if (paralleltrain)
mpi = MPIWrapper::GetInstance(true /*create*/); mpi = MPIWrapper::GetInstance(true /*create*/);
}
g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false); g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false);

Просмотреть файл

@ -46,11 +46,11 @@ namespace Microsoft {
}; };
template<class ElemType = float> template<class ElemType = float>
class MultiversoWrapper class MultiversoHelper
{ {
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr; typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
public: public:
MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes, MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int MPINodeNum, int MPINodeNum,
bool isAsyncBuffered = true, bool isAsyncBuffered = true,
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None, AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
@ -74,7 +74,8 @@ namespace Microsoft {
m_cpuAsyncBuffer = new ElemType*[m_localCacheNumber]; m_cpuAsyncBuffer = new ElemType*[m_localCacheNumber];
#ifndef CPUONLY #ifndef CPUONLY
//GPU asynchronous buffer //GPU asynchronous buffer
m_gpuAsyncBuffer = new Matrix<ElemType>**[m_localCacheNumber]; //m_gpuAsyncBuffer = new Matrix<ElemType>**[m_localCacheNumber];
m_gpuAsyncBuffer.resize(m_localCacheNumber);
//creat an communication stream for the data tranfer between GPU and CPU //creat an communication stream for the data tranfer between GPU and CPU
CudaErrorCheck(cudaStreamCreate(&_commStream)); CudaErrorCheck(cudaStreamCreate(&_commStream));
@ -91,9 +92,9 @@ namespace Microsoft {
MultiversoInit(learnableNodes); MultiversoInit(learnableNodes);
} }
~MultiversoWrapper() ~MultiversoHelper()
{ {
fprintf(stderr, "~MultiversoWrapper\n"); fprintf(stderr, "~MultiversoHelper\n");
fflush(stderr); fflush(stderr);
if (m_isUseAsyncBuffered && m_prefetchThread != nullptr && m_prefetchThread->joinable()) if (m_isUseAsyncBuffered && m_prefetchThread != nullptr && m_prefetchThread->joinable())
@ -126,11 +127,17 @@ namespace Microsoft {
{ {
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter); ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Matrix<ElemType> &mat = node->Value(); Matrix<ElemType> &mat = node->Value();
printf("here!2\n");
fflush(stdout);
#pragma warning( push )
#pragma warning( disable : 4238)
#ifndef CPUONLY #ifndef CPUONLY
for (int j = 0; j < m_localCacheNumber; j++) for (int j = 0; j < m_localCacheNumber; j++)
m_gpuAsyncBuffer[j][i] = new Matrix<ElemType>(mat); m_gpuAsyncBuffer[j].push_back(mat.DeepClone());
//m_gpuAsyncBuffer[j][i] = mat.DeepClone();
#endif #endif
#pragma warning( pop )
ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]); mat.CopyToArray(px, m_tableLength[i]);
} }
@ -178,14 +185,14 @@ namespace Microsoft {
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value(); Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY #ifndef CPUONLY
//CNTK model -> GPU buffer //CNTK model -> GPU buffer
CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferInUse][i]->BufferPointer(), CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferInUse][i].Data(),
mat.BufferPointer(), mat.Data(),
mat.GetNumElements() * sizeof(ElemType), mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice)); cudaMemcpyDeviceToDevice));
//GPU buffer -> CNTK model //GPU buffer -> CNTK model
CudaErrorCheck(cudaMemcpy(mat.BufferPointer(), CudaErrorCheck(cudaMemcpy(mat.Data(),
m_gpuAsyncBuffer[m_cacheSwapIndex[m_bufferInUse]][i]->BufferPointer(), m_gpuAsyncBuffer[m_cacheSwapIndex[m_bufferInUse]][i].Data(),
mat.GetNumElements() * sizeof(ElemType), mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice)); cudaMemcpyDeviceToDevice));
#else #else
@ -205,7 +212,7 @@ namespace Microsoft {
m_prefetchThread = new thread([&](){ m_prefetchThread = new thread([&](){
float factor = DecayCoefficient(); float factor = DecayCoefficient();
int t_cacheIdx = m_bufferInUse; int t_cacheIdx = m_bufferInUse;
int deviceId = m_gpuAsyncBuffer[t_cacheIdx][0]->GetDeviceId(); int deviceId = m_gpuAsyncBuffer[t_cacheIdx][0].GetDeviceId();
CudaErrorCheck(cudaSetDevice(deviceId)); CudaErrorCheck(cudaSetDevice(deviceId));
@ -214,8 +221,8 @@ namespace Microsoft {
ElemType * px = m_deltaArray + m_tableOffsets[widx]; ElemType * px = m_deltaArray + m_tableOffsets[widx];
//GPU buffer -> CPU buffer //GPU buffer -> CPU buffer
CudaErrorCheck(cudaMemcpyAsync(px, CudaErrorCheck(cudaMemcpyAsync(px,
m_gpuAsyncBuffer[t_cacheIdx][widx]->BufferPointer(), m_gpuAsyncBuffer[t_cacheIdx][widx].Data(),
m_gpuAsyncBuffer[t_cacheIdx][widx]->GetNumElements() * sizeof(ElemType), m_gpuAsyncBuffer[t_cacheIdx][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToHost, cudaMemcpyDeviceToHost,
_commStream)); _commStream));
} }
@ -242,9 +249,9 @@ namespace Microsoft {
{ {
ElemType * py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx]; ElemType * py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx];
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[t_cacheIdx][widx]->BufferPointer(), CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[t_cacheIdx][widx].Data(),
py, py,
m_gpuAsyncBuffer[t_cacheIdx][widx]->GetNumElements() * sizeof(ElemType), m_gpuAsyncBuffer[t_cacheIdx][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyHostToDevice, cudaMemcpyHostToDevice,
_commStream)); _commStream));
} }
@ -376,8 +383,13 @@ namespace Microsoft {
} }
#ifndef CPUONLY #ifndef CPUONLY
printf("here!1\n");
fflush(stdout);
for (int i = 0; i < m_localCacheNumber; i++) for (int i = 0; i < m_localCacheNumber; i++)
m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount]; //m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount];
m_gpuAsyncBuffer[i].reserve(m_tableCount);
printf("here!2\n");
fflush(stdout);
//create pinned memory //create pinned memory
for (int i = 0; i < m_localCacheNumber; ++i) for (int i = 0; i < m_localCacheNumber; ++i)
@ -433,7 +445,8 @@ namespace Microsoft {
ElemType ** m_cpuAsyncBuffer; ElemType ** m_cpuAsyncBuffer;
//GPU double buffer //GPU double buffer
Matrix<ElemType> *** m_gpuAsyncBuffer; //Matrix<ElemType> ** m_gpuAsyncBuffer;
std::vector<std::vector<Matrix<ElemType> >> m_gpuAsyncBuffer;
int m_tableCount; int m_tableCount;
#ifndef CPUONLY #ifndef CPUONLY
cudaStream_t _commStream; cudaStream_t _commStream;

Просмотреть файл

@ -12,10 +12,10 @@ namespace Microsoft {
}; };
template<class ElemType = float> template<class ElemType = float>
class MultiversoWrapper class MultiversoHelper
{ {
public: public:
MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes, MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int localWorkerNumber, int localWorkerNumber,
bool isPipeline = true, bool isPipeline = true,
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None, AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
@ -25,7 +25,7 @@ namespace Microsoft {
} }
~MultiversoWrapper() ~MultiversoHelper()
{ {
} }

Просмотреть файл

@ -348,16 +348,16 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
//Multiverso Warpper for ASGD logic init //Multiverso Warpper for ASGD logic init
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD) if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
{ {
m_multiverso = new MultiversoWrapper<ElemType>(learnableNodes, m_pMultiversoHelper = new MultiversoHelper<ElemType>(learnableNodes,
g_mpi->NumNodesInUse(), m_mpi->NumNodesInUse(),
m_isPipeline, m_isPipeline,
m_adjustlearningrateatbeginning, m_adjustlearningrateatbeginning,
m_adjustcoefficient, m_adjustcoefficient,
m_adjustnbminibatch, m_adjustnbminibatch,
m_traceLevel); m_traceLevel);
m_multiverso->InitModel(learnableNodes); m_pMultiversoHelper->InitModel(learnableNodes);
m_multiversoBarrier = false; m_pMultiversoHelperBarrier = false;
m_multiverso->WaitAll(); m_pMultiversoHelper->WaitAll();
} }
// --- MAIN EPOCH LOOP // --- MAIN EPOCH LOOP
@ -716,7 +716,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
// Synchronize all ranks before proceeding to ensure that // Synchronize all ranks before proceeding to ensure that
// rank 0 has finished writing the model file // rank 0 has finished writing the model file
// TODO[DataASGD]: should othet other rank waiting in async-mode // TODO[DataASGD]: should othet other rank waiting in async-mode
if (m_mpi != nullptr && GetParallazationMethod() != ParallelizationMethod::DataParallelASGD) if (m_mpi != nullptr && GetParallelizationMethod() != ParallelizationMethod::DataParallelASGD)
{ {
m_mpi->WaitAll(); m_mpi->WaitAll();
} }
@ -738,7 +738,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
delete inputMatrices; delete inputMatrices;
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD) if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD)
{ {
delete m_multiverso; delete m_pMultiversoHelper;
} }
} }
@ -795,7 +795,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
(epochNumber >= m_parallelizationStartEpochNum)); (epochNumber >= m_parallelizationStartEpochNum));
bool useModelAveraging = ((GetParallelizationMethod() == ParallelizationMethod::ModelAveragingSGD) && bool useModelAveraging = ((GetParallelizationMethod() == ParallelizationMethod::ModelAveragingSGD) &&
(epochNumber >= m_parallelizationStartEpochNum)); (epochNumber >= m_parallelizationStartEpochNum));
bool useASGD = ((m_parallelizationMethod == ParallelizationMethod::DataParallelASGD) && bool useASGD = ((GetParallelizationMethod() == ParallelizationMethod::DataParallelASGD) &&
(epochNumber >= m_parallelizationStartEpochNum)); (epochNumber >= m_parallelizationStartEpochNum));
bool useParallelTrain = useGradientAggregation || useModelAveraging || useASGD; bool useParallelTrain = useGradientAggregation || useModelAveraging || useASGD;
@ -1101,12 +1101,14 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
} }
} }
if (useASGD && g_mpi->NumNodesInUse() > 1) // using parameter server for parameter update
if (useASGD && m_mpi->NumNodesInUse() > 1)
{ {
if (m_parallelizationMethod == ParallelizationMethod::DataParallelASGD && m_nEpochBarrier[epochNumber] > 0 && epochNumber % m_nEpochBarrier[epochNumber] == 0) if (GetParallelizationMethod() == ParallelizationMethod::DataParallelASGD && m_nEpochBarrier[epochNumber] > 0 && epochNumber % m_nEpochBarrier[epochNumber] == 0)
{ {
m_multiverso->WaitAsyncBuffer(); // simulating BSP
m_multiverso->WaitAll(); m_pMultiversoHelper->WaitAsyncBuffer();
m_pMultiversoHelper->WaitAll();
} }
// Determine if any samples were processed across any of the ranks // Determine if any samples were processed across any of the ranks
@ -1115,14 +1117,11 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
noMoreSamplesToProcess = !wasDataRead; noMoreSamplesToProcess = !wasDataRead;
} }
size_t processedSamples = 0;
if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber]) if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber])
{ {
m_multiverso->PushAndPullModel(learnableNodes); m_pMultiversoHelper->PushAndPullModel(learnableNodes);
processedSamples = nSamplesSinceLastModelSync;
nSamplesSinceLastModelSync = 0; nSamplesSinceLastModelSync = 0;
} }
aggregateNumSamplesWithLabel = processedSamples;
} }
commTimer.Stop(); commTimer.Stop();
@ -1236,7 +1235,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
timer.Restart(); timer.Restart();
totalEpochSamples += aggregateNumSamplesWithLabel; totalEpochSamples += aggregateNumSamplesWithLabel;
if (!useModelAveraging && !useDataASGD) if (!useModelAveraging && !useASGD)
totalSamplesSeen += aggregateNumSamplesWithLabel; totalSamplesSeen += aggregateNumSamplesWithLabel;
readTimer.Restart(); readTimer.Restart();
@ -1263,13 +1262,13 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
nSamplesSinceLastModelSync = 0; nSamplesSinceLastModelSync = 0;
} }
if (useASGD && (g_mpi->NumNodesInUse() > 1)) if (useASGD && (m_mpi->NumNodesInUse() > 1))
{ {
// ASGD also may not be synced after epoch finished, so do the sync here // ASGD also shouldn't syncing after every epoch
int residualSampels = (int)nSamplesSinceLastModelSync; int residualSampels = (int)nSamplesSinceLastModelSync;
totalSamplesSeen += residualSampels; totalSamplesSeen += residualSampels;
totalEpochSamples += residualSampels; totalEpochSamples += residualSampels;
m_multiverso->PushAndPullModel(learnableNodes); m_pMultiversoHelper->PushAndPullModel(learnableNodes);
nSamplesSinceLastModelSync = 0; nSamplesSinceLastModelSync = 0;
} }

Просмотреть файл

@ -555,8 +555,8 @@ protected:
private: private:
int SGDTrace(FILE* __restrict __stream, bool isPrependTimestamp, const char* __restrict __format, ...); int SGDTrace(FILE* __restrict __stream, bool isPrependTimestamp, const char* __restrict __format, ...);
MultiversoWrapper<ElemType>* m_multiverso; MultiversoHelper<ElemType>* m_pMultiversoHelper;
bool m_multiversoBarrier; bool m_pMultiversoHelperBarrier;
}; };
}}} }}}

Просмотреть файл

@ -67,7 +67,7 @@
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(CNTK_ENABLE_ASGD)'=='true'"> <ItemDefinitionGroup Condition="'$(CNTK_ENABLE_ASGD)'=='true'">
<ClCompile> <ClCompile>
<AdditionalIncludeDirectories>$(SolutionDir)Source\multiverso;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> <AdditionalIncludeDirectories>$(SolutionDir)Source\multiverso\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile> </ClCompile>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemDefinitionGroup Condition="$(DebugBuild)"> <ItemDefinitionGroup Condition="$(DebugBuild)">