From 65750afd50654f2e342aba090bdc23412bbc6c1c Mon Sep 17 00:00:00 2001 From: feiga Date: Tue, 1 Nov 2016 17:29:04 +0800 Subject: [PATCH 1/4] Fix compile error --- Source/Common/Include/NoMultiversoWrapper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Common/Include/NoMultiversoWrapper.h b/Source/Common/Include/NoMultiversoWrapper.h index c58b06725..563648c29 100644 --- a/Source/Common/Include/NoMultiversoWrapper.h +++ b/Source/Common/Include/NoMultiversoWrapper.h @@ -20,7 +20,7 @@ public: int nodeNumRanks, bool useAsyncBuffered = true, bool isSimModelAveragingSGD = false, - AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None, + AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, double adjustcoef = 0.2, size_t adjustnbmb = 600, int traceLevel = 0, From ce87588a268fd70826e681ac33500be1cf49e4f8 Mon Sep 17 00:00:00 2001 From: feiga Date: Tue, 1 Nov 2016 20:34:04 +0800 Subject: [PATCH 2/4] Remove ASGDCommon, add ASGDHelper Interface, and move Multiverso related implementation to ASGDHelper.cpp file --- Source/Common/Include/ASGDCommon.h | 29 - Source/Common/Include/ASGDHelper.h | 69 ++ Source/Common/Include/MultiversoWrapper.h | 737 -------------------- Source/Common/Include/NoMultiversoWrapper.h | 46 -- Source/SGDLib/ASGDHelper.cpp | 631 +++++++++++++++++ Source/SGDLib/SGD.cpp | 23 +- Source/SGDLib/SGD.h | 10 +- Source/SGDLib/SGDLib.vcxproj | 5 +- Source/SGDLib/SGDLib.vcxproj.filters | 11 +- 9 files changed, 718 insertions(+), 843 deletions(-) delete mode 100644 Source/Common/Include/ASGDCommon.h create mode 100644 Source/Common/Include/ASGDHelper.h delete mode 100644 Source/Common/Include/MultiversoWrapper.h delete mode 100644 Source/Common/Include/NoMultiversoWrapper.h create mode 100644 Source/SGDLib/ASGDHelper.cpp diff --git a/Source/Common/Include/ASGDCommon.h b/Source/Common/Include/ASGDCommon.h deleted file mode 100644 index 3ebbd08e3..000000000 --- a/Source/Common/Include/ASGDCommon.h +++ /dev/null @@ -1,29 +0,0 @@ -// -// -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. -// -// -// -#pragma once - -namespace Microsoft { namespace MSR { namespace CNTK { - -// ----------------------------------------------------------------------- -// class AdjustLearningRateAtBeginning -// Providing option for DataParallelASGD training. so that every nodes -// could adjust learning rate every minibatch at first N epochs. -// ----------------------------------------------------------------------- - - -// TODO: We can removed these options once we can adjust learning rate at minibatchs level - -enum class AdjustLearningRateAtBeginning : int -{ - None = 0, // default, don't adjust learning rate - Linearly = 1, // using linear adjustment, learning rate will from 0 to learningRatesPerMB - Staircase = (1 << 1), // using staircased adjustment, learning rate will from 0 to learningRatesPerMB every adjustNbMinibatch -}; - -}}} diff --git a/Source/Common/Include/ASGDHelper.h b/Source/Common/Include/ASGDHelper.h new file mode 100644 index 000000000..b8935de61 --- /dev/null +++ b/Source/Common/Include/ASGDHelper.h @@ -0,0 +1,69 @@ +// +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// +// +#pragma once + +#include +#include "MPIWrapper.h" +#include "ComputationNetwork.h" + +namespace Microsoft { namespace MSR { namespace CNTK { + +// ----------------------------------------------------------------------- +// class AdjustLearningRateAtBeginning +// Providing option for DataParallelASGD training. so that every nodes +// could adjust learning rate every minibatch at first N epochs. +// ----------------------------------------------------------------------- +// TODO: We can removed these options once we can adjust learning rate at minibatchs level +enum class AdjustLearningRateAtBeginning : int +{ + None = 0, // default, don't adjust learning rate + Linearly = 1, // using linear adjustment, learning rate will from 0 to learningRatesPerMB + Staircase = (1 << 1), // using staircased adjustment, learning rate will from 0 to learningRatesPerMB every adjustNbMinibatch +}; + +template +class ASGDHelper +{ + typedef shared_ptr> ComputationNodePtr; +public: + virtual ~ASGDHelper() { } + // ----------------------------------------------------------------------- + // InitModel() -- Upload initilized model (, which was pre-computed by CNTK logic) . + // to the parameter servers, so that every node could start training from same model + // ----------------------------------------------------------------------- + virtual void InitModel(const std::list & learnableNodes) = 0; + + // ----------------------------------------------------------------------- + // PushAndPullModel() -- Push parameters of learnableNodes to parameter servers, then get the latests model back. + // ----------------------------------------------------------------------- + virtual bool PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced = 0) = 0; + + // ----------------------------------------------------------------------- + // WaitAll() -- Wait(Barrier) all the other nodes to process + // ----------------------------------------------------------------------- + virtual void WaitAll() = 0; + + virtual void WaitAsyncBuffer() = 0; + +}; // Class ASGDHelper + +// Factory method to create a ASGDHelper instance +template +ASGDHelper* NewASGDHelper( + const std::list & learnableNodes, // Parameters that needs to be train + size_t nodeNumRanks, // Number of working nodes + bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost + bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD + AdjustLearningRateAtBeginning adjusttype = + AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process + double adjustCoef = 0.2, // see in DecayCoefficient() + size_t adjustPerMinibatches = 600, // + int traceLevel = 0, // log level + int syncPerfStats = 0, // shown perf data every syncPerfStats + const MPIWrapperPtr& pMPI = nullptr); + +}}} diff --git a/Source/Common/Include/MultiversoWrapper.h b/Source/Common/Include/MultiversoWrapper.h deleted file mode 100644 index 521d78db0..000000000 --- a/Source/Common/Include/MultiversoWrapper.h +++ /dev/null @@ -1,737 +0,0 @@ -// -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. -// -// -#pragma once - -// the header files located in Source\Multiverso\include -#include -#include -#include - -#ifdef MULTIVERSO_USE_MATRIXTABLE -#include -#else -#include -#endif -#include - -#pragma comment(lib, "Multiverso.lib") - -#ifndef CPUONLY -#include -#pragma comment (lib, "cudart.lib") // for cudaMemcpyAsync() -#endif - -// TODO: test for the model aggregation -#include "MPIWrapper.h" -#include "ComputationNetwork.h" -#include "TimerUtility.h" -#include "ASGDCommon.h" - -#include -#include -#include -#include -#include - -namespace Microsoft { namespace MSR { namespace CNTK { -// #define MULTIVERSO_DEBUG -#ifndef CPUONLY -#define CudaErrorCheck(ans) { gpuAssert((ans), __FILE__, __LINE__); } - inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true) - { - if (code != cudaSuccess) - { - fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line); - if (abort) exit(code); - } - } -#endif - -template -class MultiversoHelper -{ -typedef shared_ptr> ComputationNodePtr; -public: -// ----------------------------------------------------------------------- -// MultiversoHelper() -- Construct function for MultiversoHelper. -// ----------------------------------------------------------------------- - -MultiversoHelper(const std::list & learnableNodes, // Parameters that needs to be train - size_t nodeNumRanks, // Number of working nodes - bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost - bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD - AdjustLearningRateAtBeginning adjusttype = - AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process - // this could be used to tackle the unstableness of ASGD - double adjustCoef = 0.2, // see in DecayCoefficient() - size_t adjustPerMinibatches = 600, // - int traceLevel = 0, // log level - int syncPerfStats = 0, // shown perf data every syncPerfStats - const MPIWrapperPtr& pMPI = nullptr) - : m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype), - m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches), - m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered), - m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false), - m_pMPI(pMPI), m_syncPerfStats(syncPerfStats) -{ - if (m_ModelAveragingSGDSimulating) - { - m_doesEveryNodesShouldSynced = true; - m_useAsyncBuffered = false; - } - // Pipeline releated variables - m_localBufferNum = m_useAsyncBuffered ? 2 : 1; - m_bufferSwapIndex = new int[m_localBufferNum]; - - // CPU asynchronous buffer - m_cpuAsyncBuffer = new ElemType*[m_localBufferNum]; - - // Get option used by multiverso sparse update - m_getOptions.reserve(m_localBufferNum); - m_addOptions.reserve(m_localBufferNum); - -#ifndef CPUONLY - // GPU asynchronous buffer - m_gpuAsyncBuffer.resize(m_localBufferNum); - // creat an communication stream for the data tranfer between GPU and CPU - CudaErrorCheck(cudaStreamCreate(&_commStream)); -#endif - m_bufferIndexInUse = 0; - for (int i = 0; i < m_localBufferNum; i++) - m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum; - - m_aysncBufferThread = nullptr; - - if (m_traceLevel > 5) - multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug); - else if (m_traceLevel > 4) - multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error); - - if (m_doesEveryNodesShouldSynced) - multiverso::SetCMDFlag("sync", true); - - MultiversoInit(learnableNodes); -} -// ----------------------------------------------------------------------- -// ~MultiversoHelper() -- Destruct function for MultiversoHelper. -// ----------------------------------------------------------------------- - -~MultiversoHelper() -{ - fprintf(stderr, "~MultiversoHelper\n"); - fflush(stderr); - - if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) - m_aysncBufferThread->join(); - - delete m_bufferSwapIndex, m_deltaArray; - - for (size_t i = 0; i < m_localBufferNum; i++) - { -#ifndef CPUONLY - CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i])); -#else - delete m_cpuAsyncBuffer[i]; -#endif - } - delete m_cpuAsyncBuffer; -#ifndef CPUONLY - CudaErrorCheck(cudaStreamDestroy(_commStream)); -#endif - multiverso::MV_ShutDown(false); -} - -// ----------------------------------------------------------------------- -// InitModel() -- Upload initilized model (, which was pre-computed by CNTK logic) . -// to the parameter servers, so that every node could start training from same model -// ----------------------------------------------------------------------- -void InitModel(const std::list & learnableNodes) -{ - float factor = 1.0f / m_totalClientNumber; - - int i = 0; // indicate the index of learnable nodes - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Matrix &mat = node->Value(); - -#ifndef CPUONLY - for (int j = 0; j < m_localBufferNum; j++) - m_gpuAsyncBuffer[j].push_back(mat.DeepClone()); -#endif - ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - } - - for (int i = 1; i < m_localBufferNum; i++) - memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); - - memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); - - // because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server. - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), -factor)); - -// Using matrix_table on multiverso could make CNTK sync with parameter server layer by layer possable, -// but it will also slower about 10% than array_table. -#ifdef MULTIVERSO_USE_MATRIXTABLE - for (int widx = 0; widx < m_tableCount; widx++) - { - if (m_isSparseArray[widx]) - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[0]); - multiversoMatrix->Get(px, m_tableLength[widx], m_getOptions[0]); - WaitAll(); - multiversoMatrix->Get(px, m_tableLength[widx], m_getOptions[0]); - } - else - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx]); - multiversoMatrix->Get(px, m_tableLength[widx]); - WaitAll(); - multiversoMatrix->Get(px, m_tableLength[widx]); - } - } -#else - m_workerArray->Add(m_deltaArray, m_totalModelSize); - m_workerArray->Get(m_deltaArray, m_totalModelSize); - WaitAll(); - m_workerArray->Get(m_deltaArray, m_totalModelSize); -#endif - - if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0])) - multiverso::Log::Info("multiverso initial model loaded.\n"); - m_reportTimer.Start(); -} - -// ----------------------------------------------------------------------- -// PushAndPullModel() -- Push parameters of learnableNodes to parameter servers, then get the latests model back. -// ----------------------------------------------------------------------- -bool PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced = 0) -{ - m_parameterSyncCounter++; - - double fromCPUToGPUTime; - double fromGPUToCPUTime; - double networkTime; - double swapTimeOnGPU; - m_reportTimer.Restart(); - WaitAsyncBuffer(); - m_reportTimer.Stop(); - - // reset statics for profiling - if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0) - { - fromCPUToGPUTime = 0; - fromGPUToCPUTime = 0; - networkTime = 0; - swapTimeOnGPU = 0; - } - - m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse]; - - int i = 0; // indicate the index of learnable nodes - if (m_useAsyncBuffered) - { - m_reportTimer.Restart(); - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); -#ifndef CPUONLY - // CNTK model -> GPU buffer - CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(), - mat.Data(), - mat.GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToDevice)); - - // GPU buffer -> CNTK model - CudaErrorCheck(cudaMemcpy(mat.Data(), - m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(), - mat.GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToDevice)); -#else - ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i]; - mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py); - delete px; -#endif - } - m_reportTimer.Stop(); - if (m_traceLevel > 2) - { - swapTimeOnGPU = m_reportTimer.ElapsedSeconds(); - } -#ifndef CPUONLY - m_aysncBufferThread = new thread([&]() - { - float factor = DecayCoefficient(); - int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId(); - - CudaErrorCheck(cudaSetDevice(deviceId)); - - Timer threadTimer; - threadTimer.Restart(); - for (int widx = 0; widx < m_tableCount; widx++) - { - ElemType * px = m_deltaArray + m_tableOffsets[widx]; - // GPU buffer -> CPU buffer - CudaErrorCheck(cudaMemcpyAsync(px, - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToHost, - _commStream)); - } - // waiting copy from GPU to CPU has finished - CudaErrorCheck(cudaStreamSynchronize(_commStream)); - threadTimer.Stop(); - - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); - } - - // delta = gradient * learning_rate - std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse], - m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize, - m_deltaArray, m_deltaArray, - std::minus()); - - threadTimer.Restart(); - // lr decay - std::transform(m_deltaArray, - m_deltaArray + m_totalModelSize, - m_deltaArray, - std::bind1st(std::multiplies(), factor)); - -#ifdef MULTIVERSO_USE_MATRIXTABLE - for (int widx = 0; widx < m_tableCount; widx++) - { - if (m_isSparseArray[widx]) - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[m_bufferIndexInUse]); - multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[m_bufferIndexInUse]); - } - else - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx]); - multiversoMatrix->Get(py, m_tableLength[widx]); - } - } -#else - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); - -#endif - threadTimer.Stop(); - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); - } - - threadTimer.Restart(); - // copy parameters from CPU buffer to GPU buffer - for (int widx = 0; widx < m_tableCount; widx++) - { - ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; - - CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), - py, - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), - cudaMemcpyHostToDevice, - _commStream)); - } - CudaErrorCheck(cudaStreamSynchronize(_commStream)); - threadTimer.Stop(); - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); - } - }); -#else - m_aysncBufferThread = new thread([&]() - { - float factor = DecayCoefficient(); - int t_cacheIdx = m_bufferIndexInUse; - - std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); -#ifdef MULTIVERSO_USE_MATRIXTABLEM - for (int widx = 0; widx < m_tableCount; widx++) - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - ElemType* py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[t_cacheIdx]); - multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[t_cacheIdx]); - } -#else - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[t_cacheIdx]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); -#endif - - }); -#endif - } - else - { - m_reportTimer.Restart(); - float factor = DecayCoefficient(); - i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); - - ElemType * px = m_deltaArray + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - } - - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); - } - std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); - - // lr decay - if (m_ModelAveragingSGDSimulating) - { - factor = ModelAggregationCoefficient(sampleSinceLastSynced); - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); - if (m_traceLevel > 2 && m_syncPerfStats != 0) - { - if (m_parameterSyncCounter % m_syncPerfStats == 0) - ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport); - else - m_sampleSinceLastReport += sampleSinceLastSynced; - } - - } - else - { - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); - } - m_reportTimer.Restart(); -#ifdef MULTIVERSO_USE_MATRIXTABLE - for (int widx = 0; widx < m_tableCount; widx++) - { - if (m_isSparseArray[widx]) - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - ElemType* py = m_cpuAsyncBuffer[0] + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[0]); - multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[0]); - } - else - { - auto multiversoMatrix = m_matrixMap->at(widx); - ElemType* px = m_deltaArray + m_tableOffsets[widx]; - ElemType* py = m_cpuAsyncBuffer[0] + m_tableOffsets[widx]; - multiversoMatrix->Add(px, m_tableLength[widx]); - multiversoMatrix->Get(py, m_tableLength[widx]); - } - } -#else - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[0]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); -#endif - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); - } - m_reportTimer.Restart(); - i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); - - ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; - mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px); - } - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); - } - } - return true; -} - -// ----------------------------------------------------------------------- -// PushModel() -- Push parameters of learnableNodes to parameter servers -// ----------------------------------------------------------------------- -void PushModel(const std::list & learnableNode) -{ - -} - -// ----------------------------------------------------------------------- -// PullModel() -- Pull parameters of learnableNodes from parameter servers -// ----------------------------------------------------------------------- -void PullModel(const std::list & learnableNode) -{ - -} - -// ----------------------------------------------------------------------- -// WaitAll() -- Wait(Barrier) all the other nodes to process -// ----------------------------------------------------------------------- -void WaitAll() -{ - multiverso::MV_Barrier(); -} - -void WaitAsyncBuffer() -{ - if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) - { - m_aysncBufferThread->join(); - delete m_aysncBufferThread; - m_aysncBufferThread = nullptr; - } -} - -private: -// ----------------------------------------------------------------------- -// WaitAll() -- Wait(Barrier) all the other nodes to process -// ----------------------------------------------------------------------- -void MultiversoInit(const std::list & learnableNodes) -{ - assert(!m_isInitialized); - m_isInitialized = true; - - // parameter server offer vary of updaters, we only use the SGD updater for this simple case. - multiverso::SetCMDFlag(std::string("updater_type"), std::string("sgd")); - multiverso::MV_Init(); - -#ifdef MULTIVERSO_USE_MATRIXTABLE - for (int i = 0; i < m_localBufferNum; i++) - { - m_getOptions.push_back(new multiverso::GetOption()); - m_getOptions.at(i)->set_worker_id(m_localBufferNum * multiverso::MV_WorkerId() + i); - m_addOptions.push_back(new multiverso::AddOption()); - m_addOptions.at(i)->set_worker_id(m_localBufferNum * multiverso::MV_WorkerId() + i); - } - - m_matrixMap = new std::vector< multiverso::MatrixWorker*>(); - m_serverMap = new std::vector< multiverso::MatrixServer*>(); - - // weights - std::wstring sparse_tag{ L"Sparse" }; -#endif - int i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Matrix &mat = node->Value(); - size_t layerSize = mat.GetNumElements(); -#ifdef MULTIVERSO_USE_MATRIXTABLE - size_t layerRowSize = mat.GetNumRows(); - size_t layerColSize = mat.GetNumCols(); - std::wstring nodeName = node->NodeName(); - auto found = nodeName.find(sparse_tag); - m_isSparseArray.push_back(false); - - if (found != std::string::npos) - { - m_isSparseArray[i] = true; - fprintf(stderr, "Layer %ls using sparseMatrix. row size: %d, col size: %d\n", nodeName.c_str(), (int)layerColSize, (int)layerRowSize); - fflush(stderr); - m_matrixMap->push_back(new multiverso::MatrixWorker(layerColSize, layerRowSize, true)); - m_serverMap->push_back(new multiverso::MatrixServer(layerColSize, layerRowSize, true, m_useAsyncBuffered)); - - } - else - { - m_isSparseArray[i] = false; - m_matrixMap->push_back(new multiverso::MatrixWorker(layerRowSize, layerColSize, false)); - m_serverMap->push_back(new multiverso::MatrixServer(layerRowSize, layerColSize, false, m_useAsyncBuffered)); - } -#endif - - m_tableLength.push_back(layerSize); - } - - m_tableCount = m_tableLength.size(); - - // cacluate total of learnable node's size - m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0); - -#ifndef MULTIVERSO_USE_MATRIXTABLE - m_serverArray = new multiverso::ArrayServer(m_totalModelSize); - m_workerArray = new multiverso::ArrayWorker(m_totalModelSize); -#endif - - multiverso::MV_Barrier(); - - size_t idx = 0; - for (size_t len : m_tableLength) - { - m_tableOffsets.push_back(idx); - idx += len; - } - -#ifndef CPUONLY - for (int i = 0; i < m_localBufferNum; i++) - m_gpuAsyncBuffer[i].reserve(m_tableCount); - - // create pinned memory - for (int i = 0; i < m_localBufferNum; ++i) - CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); - - CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); -#else - for (int i = 0; i < m_localBufferNum; i++) - m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize]; -#endif -} - -float DecayCoefficient() -{ - float f = 1.f; - switch (m_adjustLearningRateAtBeginningType) - { - case AdjustLearningRateAtBeginning::None: - break; - case AdjustLearningRateAtBeginning::Linearly: - f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter))); - break; - case AdjustLearningRateAtBeginning::Staircase: - f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1)))); - break; - default: - break; - } - return f; -} - -float ModelAggregationCoefficient(size_t samplesSinceLastSync) -{ - float factor = 0; - int nTotalSamples = samplesSinceLastSync; - // TODO[qiwye] will conflict with multiverso - // m_pMPI->AllReduce(&nTotalSamples, 1); - - if (nTotalSamples <= 0) - { - factor = 1.0f / m_pMPI->NumNodesInUse(); - // give an estimated one - } - else - { - factor = (samplesSinceLastSync + 0.0f) / nTotalSamples; - } - factor = 1.0f / m_pMPI->NumNodesInUse(); - return factor; -} - -inline void transpose(ElemType *src, ElemType *dst, const int N, const int M) -{ - for (auto n = 0; n < N*M; n++) { - auto i = n / N; - auto j = n%N; - dst[n] = src[M*j + i]; - } -} - -void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport, - size_t localSamplesProcessedSinceLastReport) -{ - m_reportTimer.Stop(); - double secondsSinceLastReport = m_reportTimer.ElapsedSeconds(); - m_reportTimer.Restart(); - - float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f; - float throughputPerWorker = totalThroughput / m_totalClientNumber; - - string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n" - "\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n"; - fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport, - (int)m_parameterSyncCounter, totalThroughput, throughputPerWorker); - m_sampleSinceLastReport = 0; - -} - -#ifdef MULTIVERSO_USE_MATRIXTABLE -std::vector*>* m_matrixMap; -std::vector*>* m_serverMap; -std::vector m_isSparseArray; -// Todo(qiwye): using ArrayTable for less comunications between servers and workers. -#else -multiverso::ArrayServer* m_serverArray; -multiverso::ArrayWorker* m_workerArray; -#endif - -thread * m_aysncBufferThread; -bool m_isInitialized; -bool m_doesEveryNodesShouldSynced; -bool m_ModelAveragingSGDSimulating; - -int m_totalClientNumber; -int m_traceLevel; -int m_syncPerfStats; -Timer m_reportTimer; -size_t m_parameterSyncCounter; -size_t m_sampleSinceLastReport; - -bool m_useAsyncBuffered; -int m_localBufferNum; -int * m_bufferSwapIndex; -int m_bufferIndexInUse; -std::vector< multiverso::GetOption*> m_getOptions; // used by sparse table -std::vector< multiverso::AddOption*> m_addOptions; // used by sparse table - - -AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType; -double m_adjustCoefficient; -size_t m_adjustMBNumber; - -vector m_tableLength; -size_t m_totalModelSize; -vector m_tableOffsets; -//shared_ptr m_deltaArray; -ElemType * m_deltaArray; -//std::vector > m_cpuAsyncBuffer; -ElemType ** m_cpuAsyncBuffer; - -MPIWrapperPtr m_pMPI; - -// GPU double buffer -std::vector >> m_gpuAsyncBuffer; -int m_tableCount; - -#ifndef CPUONLY -cudaStream_t _commStream; -#endif -}; -}}} diff --git a/Source/Common/Include/NoMultiversoWrapper.h b/Source/Common/Include/NoMultiversoWrapper.h deleted file mode 100644 index 563648c29..000000000 --- a/Source/Common/Include/NoMultiversoWrapper.h +++ /dev/null @@ -1,46 +0,0 @@ -// -// -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. -// -// -// -#pragma once - -#include "ASGDCommon.h" - -namespace Microsoft { namespace MSR { namespace CNTK { - -template -class MultiversoHelper -{ -public: - MultiversoHelper(const std::list & learnableNodes, - int nodeNumRanks, - bool useAsyncBuffered = true, - bool isSimModelAveragingSGD = false, - AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, - double adjustcoef = 0.2, - size_t adjustnbmb = 600, - int traceLevel = 0, - int syncPerfStats = 0, - const MPIWrapperPtr& pMPI = nullptr) { } - - ~MultiversoHelper() { } - - void InitModel(const std::list & learnableNode) { } - - void PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced = 0) { } - - void PushModel(const std::list & learnableNode) { } - - void PullModel(const std::list & learnableNode) { } - - void WaitAll() { } - - void WaitAsyncBuffer() { } - }; -} -} -} diff --git a/Source/SGDLib/ASGDHelper.cpp b/Source/SGDLib/ASGDHelper.cpp new file mode 100644 index 000000000..e053ba338 --- /dev/null +++ b/Source/SGDLib/ASGDHelper.cpp @@ -0,0 +1,631 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// +// ASGDHelper.cpp : Implements ASGDHelper interface. The implementation is based on Multiverso. +// + +#include "ASGDHelper.h" +#include "MPIWrapper.h" +#include "ComputationNetwork.h" +#include "TimerUtility.h" +#include "ASGDCommon.h" + +#include +#include +#include +#include +#include + +#ifdef ASGD_PARALLEL_SUPPORT + +#include +#include +#include +#include +#include + +#pragma comment(lib, "Multiverso.lib") + +#endif + + +#ifndef CPUONLY +#include +#pragma comment (lib, "cudart.lib") // for cudaMemcpyAsync() +#endif + +namespace Microsoft { namespace MSR { namespace CNTK { + +#ifndef CPUONLY +#define CudaErrorCheck(ans) { gpuAssert((ans), __FILE__, __LINE__); } + +inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true) +{ + if (code != cudaSuccess) + { + fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line); + if (abort) exit(code); + } +} + +#endif + +#ifdef ASGD_PARALLEL_SUPPORT + +// MultiversoHelper is the implementation of ASGDHelper interface with Multiverso +template +class MultiversoHelper : public ASGDHelper +{ +public: + MultiversoHelper(const std::list & learnableNodes, // Parameters that needs to be train + size_t nodeNumRanks, // Number of working nodes + bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost + bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD + AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process + // this could be used to tackle the unstableness of ASGD + double adjustCoef = 0.2, // see in DecayCoefficient() + size_t adjustPerMinibatches = 600, // + int traceLevel = 0, // log level + int syncPerfStats = 0, // shown perf data every syncPerfStats + const MPIWrapperPtr& pMPI = nullptr) : + m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype), + m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches), + m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered), + m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false), + m_pMPI(pMPI), m_syncPerfStats(syncPerfStats) + { + if (m_ModelAveragingSGDSimulating) + { + m_doesEveryNodesShouldSynced = true; + m_useAsyncBuffered = false; + } + // Pipeline releated variables + m_localBufferNum = m_useAsyncBuffered ? 2 : 1; + m_bufferSwapIndex = new int[m_localBufferNum]; + + // CPU asynchronous buffer + m_cpuAsyncBuffer = new ElemType*[m_localBufferNum]; + + // Get option used by multiverso sparse update + m_getOptions.reserve(m_localBufferNum); + m_addOptions.reserve(m_localBufferNum); + +#ifndef CPUONLY + // GPU asynchronous buffer + m_gpuAsyncBuffer.resize(m_localBufferNum); + // creat an communication stream for the data tranfer between GPU and CPU + CudaErrorCheck(cudaStreamCreate(&_commStream)); +#endif + m_bufferIndexInUse = 0; + for (int i = 0; i < m_localBufferNum; i++) + m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum; + + m_aysncBufferThread = nullptr; + + if (m_traceLevel > 5) + multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug); + else if (m_traceLevel > 4) + multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error); + + if (m_doesEveryNodesShouldSynced) + multiverso::SetCMDFlag("sync", true); + + MultiversoInit(learnableNodes); + } + + ~MultiversoHelper() + { + fprintf(stderr, "~MultiversoHelper\n"); + fflush(stderr); + + if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) + m_aysncBufferThread->join(); + + delete m_bufferSwapIndex, m_deltaArray; + + for (size_t i = 0; i < m_localBufferNum; i++) + { +#ifndef CPUONLY + CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i])); +#else + delete m_cpuAsyncBuffer[i]; +#endif + } + delete m_cpuAsyncBuffer; +#ifndef CPUONLY + CudaErrorCheck(cudaStreamDestroy(_commStream)); +#endif + multiverso::MV_ShutDown(false); + } + + void InitModel(const std::list & learnableNodes) override + { + float factor = 1.0f / m_totalClientNumber; + + int i = 0; // indicate the index of learnable nodes + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Matrix &mat = node->Value(); + +#ifndef CPUONLY + for (int j = 0; j < m_localBufferNum; j++) + m_gpuAsyncBuffer[j].push_back(mat.DeepClone()); +#endif + ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + } + + for (int i = 1; i < m_localBufferNum; i++) + memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); + + memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); + + // because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server. + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), -factor)); + + m_workerArray->Add(m_deltaArray, m_totalModelSize); + m_workerArray->Get(m_deltaArray, m_totalModelSize); + WaitAll(); + m_workerArray->Get(m_deltaArray, m_totalModelSize); + + if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0])) + multiverso::Log::Info("multiverso initial model loaded.\n"); + m_reportTimer.Start(); + } + + bool PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced) override + { + m_parameterSyncCounter++; + + double fromCPUToGPUTime; + double fromGPUToCPUTime; + double networkTime; + double swapTimeOnGPU; + m_reportTimer.Restart(); + WaitAsyncBuffer(); + m_reportTimer.Stop(); + + // reset statics for profiling + if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0) + { + fromCPUToGPUTime = 0; + fromGPUToCPUTime = 0; + networkTime = 0; + swapTimeOnGPU = 0; + } + + m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse]; + + int i = 0; // indicate the index of learnable nodes + if (m_useAsyncBuffered) + { + m_reportTimer.Restart(); + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); +#ifndef CPUONLY + // CNTK model -> GPU buffer + CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(), + mat.Data(), + mat.GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToDevice)); + + // GPU buffer -> CNTK model + CudaErrorCheck(cudaMemcpy(mat.Data(), + m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(), + mat.GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToDevice)); +#else + ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i]; + mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py); + delete px; +#endif + } + m_reportTimer.Stop(); + if (m_traceLevel > 2) + { + swapTimeOnGPU = m_reportTimer.ElapsedSeconds(); + } +#ifndef CPUONLY + m_aysncBufferThread = new thread([&]() + { + float factor = DecayCoefficient(); + int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId(); + + CudaErrorCheck(cudaSetDevice(deviceId)); + + Timer threadTimer; + threadTimer.Restart(); + for (int widx = 0; widx < m_tableCount; widx++) + { + ElemType * px = m_deltaArray + m_tableOffsets[widx]; + // GPU buffer -> CPU buffer + CudaErrorCheck(cudaMemcpyAsync(px, + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToHost, + _commStream)); + } + // waiting copy from GPU to CPU has finished + CudaErrorCheck(cudaStreamSynchronize(_commStream)); + threadTimer.Stop(); + + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); + } + + // delta = gradient * learning_rate + std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse], + m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize, + m_deltaArray, m_deltaArray, + std::minus()); + + threadTimer.Restart(); + // lr decay + std::transform(m_deltaArray, + m_deltaArray + m_totalModelSize, + m_deltaArray, + std::bind1st(std::multiplies(), factor)); + + + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); + + threadTimer.Stop(); + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); + } + + threadTimer.Restart(); + // copy parameters from CPU buffer to GPU buffer + for (int widx = 0; widx < m_tableCount; widx++) + { + ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; + + CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), + py, + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), + cudaMemcpyHostToDevice, + _commStream)); + } + CudaErrorCheck(cudaStreamSynchronize(_commStream)); + threadTimer.Stop(); + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); + } + }); +#else + m_aysncBufferThread = new thread([&]() + { + float factor = DecayCoefficient(); + int t_cacheIdx = m_bufferIndexInUse; + + std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[t_cacheIdx]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); + + }); +#endif + } + else + { + m_reportTimer.Restart(); + float factor = DecayCoefficient(); + i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); + + ElemType * px = m_deltaArray + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + } + + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); + } + std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); + + // lr decay + if (m_ModelAveragingSGDSimulating) + { + factor = ModelAggregationCoefficient(sampleSinceLastSynced); + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + if (m_traceLevel > 2 && m_syncPerfStats != 0) + { + if (m_parameterSyncCounter % m_syncPerfStats == 0) + ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport); + else + m_sampleSinceLastReport += sampleSinceLastSynced; + } + + } + else + { + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + } + m_reportTimer.Restart(); + + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[0]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); + + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); + } + m_reportTimer.Restart(); + i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); + + ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; + mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px); + } + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); + } + } + return true; + } + + void WaitAll() override + { + multiverso::MV_Barrier(); + } + + void WaitAsyncBuffer() override + { + if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) + { + m_aysncBufferThread->join(); + delete m_aysncBufferThread; + m_aysncBufferThread = nullptr; + } + } + +private: + void MultiversoInit(const std::list & learnableNodes) + { + assert(!m_isInitialized); + m_isInitialized = true; + + // parameter server offer vary of updaters, we only use the SGD updater for this simple case. + multiverso::SetCMDFlag(std::string("updater_type"), std::string("sgd")); + multiverso::MV_Init(); + + int i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Matrix &mat = node->Value(); + size_t layerSize = mat.GetNumElements(); + + m_tableLength.push_back(layerSize); + } + + m_tableCount = m_tableLength.size(); + + // cacluate total of learnable node's size + m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0); + + + multiverso::MV_Barrier(); + + size_t idx = 0; + for (size_t len : m_tableLength) + { + m_tableOffsets.push_back(idx); + idx += len; + } + +#ifndef CPUONLY + for (int i = 0; i < m_localBufferNum; i++) + m_gpuAsyncBuffer[i].reserve(m_tableCount); + + // create pinned memory + for (int i = 0; i < m_localBufferNum; ++i) + CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); + + CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); +#else + for (int i = 0; i < m_localBufferNum; i++) + m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize]; +#endif + } + + float DecayCoefficient() + { + float f = 1.f; + switch (m_adjustLearningRateAtBeginningType) + { + case AdjustLearningRateAtBeginning::None: + break; + case AdjustLearningRateAtBeginning::Linearly: + f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter))); + break; + case AdjustLearningRateAtBeginning::Staircase: + f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1)))); + break; + default: + break; + } + return f; + } + + float ModelAggregationCoefficient(size_t samplesSinceLastSync) + { + float factor = 0; + int nTotalSamples = samplesSinceLastSync; + // TODO[qiwye] will conflict with multiverso + // m_pMPI->AllReduce(&nTotalSamples, 1); + + if (nTotalSamples <= 0) + { + factor = 1.0f / m_pMPI->NumNodesInUse(); + // give an estimated one + } + else + { + factor = (samplesSinceLastSync + 0.0f) / nTotalSamples; + } + factor = 1.0f / m_pMPI->NumNodesInUse(); + return factor; + } + + inline void transpose(ElemType *src, ElemType *dst, const int N, const int M) + { + for (auto n = 0; n < N*M; n++) { + auto i = n / N; + auto j = n%N; + dst[n] = src[M*j + i]; + } + } + + void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport, + size_t localSamplesProcessedSinceLastReport) + { + m_reportTimer.Stop(); + double secondsSinceLastReport = m_reportTimer.ElapsedSeconds(); + m_reportTimer.Restart(); + + float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f; + float throughputPerWorker = totalThroughput / m_totalClientNumber; + + string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n" + "\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n"; + fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport, + (int)m_parameterSyncCounter, totalThroughput, throughputPerWorker); + m_sampleSinceLastReport = 0; + + } + + multiverso::ArrayServer* m_serverArray; + multiverso::ArrayWorker* m_workerArray; + + thread * m_aysncBufferThread; + bool m_isInitialized; + bool m_doesEveryNodesShouldSynced; + bool m_ModelAveragingSGDSimulating; + + int m_totalClientNumber; + int m_traceLevel; + int m_syncPerfStats; + Timer m_reportTimer; + size_t m_parameterSyncCounter; + size_t m_sampleSinceLastReport; + + bool m_useAsyncBuffered; + int m_localBufferNum; + int * m_bufferSwapIndex; + int m_bufferIndexInUse; + std::vector m_getOptions; // used by sparse table + std::vector m_addOptions; // used by sparse table + + + AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType; + double m_adjustCoefficient; + size_t m_adjustMBNumber; + + vector m_tableLength; + size_t m_totalModelSize; + vector m_tableOffsets; + //shared_ptr m_deltaArray; + ElemType * m_deltaArray; + //std::vector > m_cpuAsyncBuffer; + ElemType ** m_cpuAsyncBuffer; + + MPIWrapperPtr m_pMPI; + + // GPU double buffer + std::vector >> m_gpuAsyncBuffer; + int m_tableCount; + +#ifndef CPUONLY + cudaStream_t _commStream; +#endif +}; // Class MultiversoHelper + +#endif + +// A None implementation of ASGDHelper interface which does nothing +// This is used when CNTK_ENABLE_ASGD = false +template +class NoneASGDHelper : public ASGDHelper +{ +public: + NoneASGDHelper(const std::list & learnableNodes, + int nodeNumRanks, + bool useAsyncBuffered = true, + bool isSimModelAveragingSGD = false, + AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, + double adjustcoef = 0.2, + size_t adjustnbmb = 600, + int traceLevel = 0, + int syncPerfStats = 0, + const MPIWrapperPtr& pMPI = nullptr) { } + + ~NoneASGDHelper() { } + + void InitModel(const std::list & learnableNode) override { } + + void PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced) override { } + + void WaitAll() override { } + + void WaitAsyncBuffer() override { } +}; + +template +ASGDHelper* NewASGDHelper( + const std::list & learnableNodes, // Parameters that needs to be train + size_t nodeNumRanks, // Number of working nodes + bool useAsyncBuffered, // Using asynchonous buffer to hide communication cost + bool isSimulatedModelAveragingSGD, + AdjustLearningRateAtBeginning adjusttype, + double adjustCoef, + size_t adjustPerMinibatches, + int traceLevel, + int syncPerfStats, + const MPIWrapperPtr& pMPI) +{ +#ifdef ASGD_PARALLEL_SUPPORT + return MultiversoHelper(learnableNodes, nodeNumRanks, useAsyncBuffered, isSimulatedModelAveragingSGD, + adjusttype, adjustCoef, adjustPerMinibatches, traceLevel, syncPerfStats, pMPI); +#elif + return NoASGDHelper(learnableNodes, nodeNumRanks, useAsyncBuffered, isSimulatedModelAveragingSGD, + adjusttype, adjustCoef, adjustPerMinibatches, traceLevel, syncPerfStats, pMPI); +#endif +} + +} // namespace CNTK +} // namespace MSR +} // namespace Microsoft \ No newline at end of file diff --git a/Source/SGDLib/SGD.cpp b/Source/SGDLib/SGD.cpp index b2f8e5545..c6aceaee6 100644 --- a/Source/SGDLib/SGD.cpp +++ b/Source/SGDLib/SGD.cpp @@ -18,11 +18,7 @@ #include "V2AllReduceDistGradAggregator.h" #endif -#ifdef ASGD_PARALLEL_SUPPORT -#include "MultiversoWrapper.h" -#else -#include "NoMultiversoWrapper.h" -#endif +#include "ASGDHelper.h" #include "SimpleDistGradAggregator.h" #include "V2SimpleDistGradAggregator.h" @@ -405,8 +401,7 @@ void SGD::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net, // Multiverso Warpper for ASGD logic init if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD) { - m_pMultiversoHelper = - std::make_shared>(learnableNodes, + m_pASGDHelper.reset(NewASGDHelper(learnableNodes, m_mpi->NumNodesInUse(), m_isPipeline, m_isSimulateMA, @@ -415,10 +410,10 @@ void SGD::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net, m_adjustPerMinibatches, m_traceLevel, m_syncStatsTrace, - m_mpi); - m_pMultiversoHelper->InitModel(learnableNodes); - m_pMultiversoHelperBarrier = false; - m_pMultiversoHelper->WaitAll(); + m_mpi)); + m_pASGDHelper->InitModel(learnableNodes); + m_pASGDHelperBarrier = false; + m_pASGDHelper->WaitAll(); } // --- MAIN EPOCH LOOP @@ -825,7 +820,7 @@ void SGD::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net, delete inputMatrices; if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD) - m_pMultiversoHelper.reset(); + m_pASGDHelper.reset(); } // ----------------------------------------------------------------------- @@ -1282,7 +1277,7 @@ size_t SGD::TrainOneEpoch(ComputationNetworkPtr net, if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber]) { - m_pMultiversoHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync); + m_pASGDHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync); nSamplesSinceLastModelSync = 0; } } @@ -1411,7 +1406,7 @@ size_t SGD::TrainOneEpoch(ComputationNetworkPtr net, if (useAsyncGradientAggregation && (m_mpi->NumNodesInUse() > 1)) { - m_pMultiversoHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync); + m_pASGDHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync); nSamplesSinceLastModelSync = 0; } diff --git a/Source/SGDLib/SGD.h b/Source/SGDLib/SGD.h index 72760b089..1880363f6 100644 --- a/Source/SGDLib/SGD.h +++ b/Source/SGDLib/SGD.h @@ -19,7 +19,7 @@ #include #include "Profiler.h" #include "MASGD.h" -#include "ASGDCommon.h" +#include "ASGDHelper.h" using namespace std; // ugh! TODO: get rid of this from .h files!!! #define CNTK_CHECKPOINT_VERSION_1 1 // 1 -> no version number @@ -316,10 +316,6 @@ protected: template class IDistGradAggregator; -// MultiversoHelper is used for parallel training using DataParallelASGD -template -class MultiversoHelper; - // ----------------------------------------------------------------------- // class SGD // ----------------------------------------------------------------------- @@ -579,8 +575,8 @@ protected: private: void MarkDropoutNodesEvalTimeStampAsOutdated(const ComputationNetworkPtr& net, const ComputationNodeBasePtr& criterionNode); - shared_ptr> m_pMultiversoHelper; - bool m_pMultiversoHelperBarrier; + std::shared_ptr> m_pASGDHelper; + bool m_pASGDHelperBarrier; bool UsingGradientAggregation(size_t epochNumber) const { diff --git a/Source/SGDLib/SGDLib.vcxproj b/Source/SGDLib/SGDLib.vcxproj index fd54c0524..d6676c6c9 100644 --- a/Source/SGDLib/SGDLib.vcxproj +++ b/Source/SGDLib/SGDLib.vcxproj @@ -99,13 +99,11 @@ - - - + @@ -143,6 +141,7 @@ + diff --git a/Source/SGDLib/SGDLib.vcxproj.filters b/Source/SGDLib/SGDLib.vcxproj.filters index 1abb2b0a2..9f0e9dc28 100644 --- a/Source/SGDLib/SGDLib.vcxproj.filters +++ b/Source/SGDLib/SGDLib.vcxproj.filters @@ -13,6 +13,9 @@ Stat + + Parallelization + @@ -123,12 +126,6 @@ from ComputationNetworkLib\Nodes - - Parallelization - - - Parallelization - Parallelization @@ -141,7 +138,7 @@ Parallelization - + Parallelization From 060e9f678d3dcb1a7a7a03988e6b25de99ff02d3 Mon Sep 17 00:00:00 2001 From: feiga Date: Tue, 1 Nov 2016 20:36:24 +0800 Subject: [PATCH 3/4] Update Makefile for the new added ASGDHelper.cpp file --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index fd310637f..fc1a11fa9 100644 --- a/Makefile +++ b/Makefile @@ -483,6 +483,7 @@ SGDLIB_SRC=\ $(SOURCEDIR)/SGDLib/Profiler.cpp \ $(SOURCEDIR)/SGDLib/SGD.cpp \ $(SOURCEDIR)/SGDLib/PostComputingActions.cpp \ + $(SOURCEDIR)/SGDLib/ASGDHelper.cpp \ SGDLIB_SRC+=$(CNTKLIBRARY_COMMON_SRC) From 10428b12fbe7bf625ede9aba81f17fc5b4ac7f02 Mon Sep 17 00:00:00 2001 From: feiga Date: Tue, 1 Nov 2016 20:41:41 +0800 Subject: [PATCH 4/4] Change Tab from 2 spaces to 4 spaces --- Source/SGDLib/ASGDHelper.cpp | 832 +++++++++++++++++------------------ 1 file changed, 416 insertions(+), 416 deletions(-) diff --git a/Source/SGDLib/ASGDHelper.cpp b/Source/SGDLib/ASGDHelper.cpp index e053ba338..2a89d576c 100644 --- a/Source/SGDLib/ASGDHelper.cpp +++ b/Source/SGDLib/ASGDHelper.cpp @@ -58,519 +58,519 @@ template class MultiversoHelper : public ASGDHelper { public: - MultiversoHelper(const std::list & learnableNodes, // Parameters that needs to be train - size_t nodeNumRanks, // Number of working nodes - bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost - bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD - AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process - // this could be used to tackle the unstableness of ASGD - double adjustCoef = 0.2, // see in DecayCoefficient() - size_t adjustPerMinibatches = 600, // - int traceLevel = 0, // log level - int syncPerfStats = 0, // shown perf data every syncPerfStats - const MPIWrapperPtr& pMPI = nullptr) : - m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype), - m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches), - m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered), - m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false), - m_pMPI(pMPI), m_syncPerfStats(syncPerfStats) - { - if (m_ModelAveragingSGDSimulating) - { - m_doesEveryNodesShouldSynced = true; - m_useAsyncBuffered = false; - } - // Pipeline releated variables - m_localBufferNum = m_useAsyncBuffered ? 2 : 1; - m_bufferSwapIndex = new int[m_localBufferNum]; + MultiversoHelper(const std::list & learnableNodes, // Parameters that needs to be train + size_t nodeNumRanks, // Number of working nodes + bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost + bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD + AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process + // this could be used to tackle the unstableness of ASGD + double adjustCoef = 0.2, // see in DecayCoefficient() + size_t adjustPerMinibatches = 600, // + int traceLevel = 0, // log level + int syncPerfStats = 0, // shown perf data every syncPerfStats + const MPIWrapperPtr& pMPI = nullptr) : + m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype), + m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches), + m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered), + m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false), + m_pMPI(pMPI), m_syncPerfStats(syncPerfStats) + { + if (m_ModelAveragingSGDSimulating) + { + m_doesEveryNodesShouldSynced = true; + m_useAsyncBuffered = false; + } + // Pipeline releated variables + m_localBufferNum = m_useAsyncBuffered ? 2 : 1; + m_bufferSwapIndex = new int[m_localBufferNum]; - // CPU asynchronous buffer - m_cpuAsyncBuffer = new ElemType*[m_localBufferNum]; + // CPU asynchronous buffer + m_cpuAsyncBuffer = new ElemType*[m_localBufferNum]; - // Get option used by multiverso sparse update - m_getOptions.reserve(m_localBufferNum); - m_addOptions.reserve(m_localBufferNum); + // Get option used by multiverso sparse update + m_getOptions.reserve(m_localBufferNum); + m_addOptions.reserve(m_localBufferNum); #ifndef CPUONLY - // GPU asynchronous buffer - m_gpuAsyncBuffer.resize(m_localBufferNum); - // creat an communication stream for the data tranfer between GPU and CPU - CudaErrorCheck(cudaStreamCreate(&_commStream)); + // GPU asynchronous buffer + m_gpuAsyncBuffer.resize(m_localBufferNum); + // creat an communication stream for the data tranfer between GPU and CPU + CudaErrorCheck(cudaStreamCreate(&_commStream)); #endif - m_bufferIndexInUse = 0; - for (int i = 0; i < m_localBufferNum; i++) - m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum; + m_bufferIndexInUse = 0; + for (int i = 0; i < m_localBufferNum; i++) + m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum; - m_aysncBufferThread = nullptr; + m_aysncBufferThread = nullptr; - if (m_traceLevel > 5) - multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug); - else if (m_traceLevel > 4) - multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error); + if (m_traceLevel > 5) + multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug); + else if (m_traceLevel > 4) + multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error); - if (m_doesEveryNodesShouldSynced) - multiverso::SetCMDFlag("sync", true); + if (m_doesEveryNodesShouldSynced) + multiverso::SetCMDFlag("sync", true); - MultiversoInit(learnableNodes); - } + MultiversoInit(learnableNodes); + } - ~MultiversoHelper() - { - fprintf(stderr, "~MultiversoHelper\n"); - fflush(stderr); + ~MultiversoHelper() + { + fprintf(stderr, "~MultiversoHelper\n"); + fflush(stderr); - if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) - m_aysncBufferThread->join(); + if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) + m_aysncBufferThread->join(); - delete m_bufferSwapIndex, m_deltaArray; + delete m_bufferSwapIndex, m_deltaArray; - for (size_t i = 0; i < m_localBufferNum; i++) - { + for (size_t i = 0; i < m_localBufferNum; i++) + { #ifndef CPUONLY - CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i])); + CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i])); #else - delete m_cpuAsyncBuffer[i]; + delete m_cpuAsyncBuffer[i]; #endif - } - delete m_cpuAsyncBuffer; + } + delete m_cpuAsyncBuffer; #ifndef CPUONLY - CudaErrorCheck(cudaStreamDestroy(_commStream)); + CudaErrorCheck(cudaStreamDestroy(_commStream)); #endif - multiverso::MV_ShutDown(false); - } + multiverso::MV_ShutDown(false); + } - void InitModel(const std::list & learnableNodes) override - { - float factor = 1.0f / m_totalClientNumber; + void InitModel(const std::list & learnableNodes) override + { + float factor = 1.0f / m_totalClientNumber; - int i = 0; // indicate the index of learnable nodes - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Matrix &mat = node->Value(); + int i = 0; // indicate the index of learnable nodes + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Matrix &mat = node->Value(); #ifndef CPUONLY - for (int j = 0; j < m_localBufferNum; j++) - m_gpuAsyncBuffer[j].push_back(mat.DeepClone()); + for (int j = 0; j < m_localBufferNum; j++) + m_gpuAsyncBuffer[j].push_back(mat.DeepClone()); #endif - ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - } + ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + } - for (int i = 1; i < m_localBufferNum; i++) - memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); + for (int i = 1; i < m_localBufferNum; i++) + memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); - memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); + memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); - // because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server. - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), -factor)); + // because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server. + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), -factor)); - m_workerArray->Add(m_deltaArray, m_totalModelSize); - m_workerArray->Get(m_deltaArray, m_totalModelSize); - WaitAll(); - m_workerArray->Get(m_deltaArray, m_totalModelSize); + m_workerArray->Add(m_deltaArray, m_totalModelSize); + m_workerArray->Get(m_deltaArray, m_totalModelSize); + WaitAll(); + m_workerArray->Get(m_deltaArray, m_totalModelSize); - if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0])) - multiverso::Log::Info("multiverso initial model loaded.\n"); - m_reportTimer.Start(); - } + if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0])) + multiverso::Log::Info("multiverso initial model loaded.\n"); + m_reportTimer.Start(); + } - bool PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced) override - { - m_parameterSyncCounter++; + bool PushAndPullModel(const std::list & learnableNodes, size_t sampleSinceLastSynced) override + { + m_parameterSyncCounter++; - double fromCPUToGPUTime; - double fromGPUToCPUTime; - double networkTime; - double swapTimeOnGPU; - m_reportTimer.Restart(); - WaitAsyncBuffer(); - m_reportTimer.Stop(); + double fromCPUToGPUTime; + double fromGPUToCPUTime; + double networkTime; + double swapTimeOnGPU; + m_reportTimer.Restart(); + WaitAsyncBuffer(); + m_reportTimer.Stop(); - // reset statics for profiling - if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0) - { - fromCPUToGPUTime = 0; - fromGPUToCPUTime = 0; - networkTime = 0; - swapTimeOnGPU = 0; - } + // reset statics for profiling + if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0) + { + fromCPUToGPUTime = 0; + fromGPUToCPUTime = 0; + networkTime = 0; + swapTimeOnGPU = 0; + } - m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse]; + m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse]; - int i = 0; // indicate the index of learnable nodes - if (m_useAsyncBuffered) - { - m_reportTimer.Restart(); - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); + int i = 0; // indicate the index of learnable nodes + if (m_useAsyncBuffered) + { + m_reportTimer.Restart(); + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); #ifndef CPUONLY - // CNTK model -> GPU buffer - CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(), - mat.Data(), - mat.GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToDevice)); + // CNTK model -> GPU buffer + CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(), + mat.Data(), + mat.GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToDevice)); - // GPU buffer -> CNTK model - CudaErrorCheck(cudaMemcpy(mat.Data(), - m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(), - mat.GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToDevice)); + // GPU buffer -> CNTK model + CudaErrorCheck(cudaMemcpy(mat.Data(), + m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(), + mat.GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToDevice)); #else - ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i]; - mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py); - delete px; + ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i]; + mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py); + delete px; #endif - } - m_reportTimer.Stop(); - if (m_traceLevel > 2) - { - swapTimeOnGPU = m_reportTimer.ElapsedSeconds(); - } + } + m_reportTimer.Stop(); + if (m_traceLevel > 2) + { + swapTimeOnGPU = m_reportTimer.ElapsedSeconds(); + } #ifndef CPUONLY - m_aysncBufferThread = new thread([&]() - { - float factor = DecayCoefficient(); - int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId(); + m_aysncBufferThread = new thread([&]() + { + float factor = DecayCoefficient(); + int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId(); - CudaErrorCheck(cudaSetDevice(deviceId)); + CudaErrorCheck(cudaSetDevice(deviceId)); - Timer threadTimer; - threadTimer.Restart(); - for (int widx = 0; widx < m_tableCount; widx++) - { - ElemType * px = m_deltaArray + m_tableOffsets[widx]; - // GPU buffer -> CPU buffer - CudaErrorCheck(cudaMemcpyAsync(px, - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), - cudaMemcpyDeviceToHost, - _commStream)); - } - // waiting copy from GPU to CPU has finished - CudaErrorCheck(cudaStreamSynchronize(_commStream)); - threadTimer.Stop(); + Timer threadTimer; + threadTimer.Restart(); + for (int widx = 0; widx < m_tableCount; widx++) + { + ElemType * px = m_deltaArray + m_tableOffsets[widx]; + // GPU buffer -> CPU buffer + CudaErrorCheck(cudaMemcpyAsync(px, + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), + cudaMemcpyDeviceToHost, + _commStream)); + } + // waiting copy from GPU to CPU has finished + CudaErrorCheck(cudaStreamSynchronize(_commStream)); + threadTimer.Stop(); - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); - } + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); + } - // delta = gradient * learning_rate - std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse], - m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize, - m_deltaArray, m_deltaArray, - std::minus()); + // delta = gradient * learning_rate + std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse], + m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize, + m_deltaArray, m_deltaArray, + std::minus()); - threadTimer.Restart(); - // lr decay - std::transform(m_deltaArray, - m_deltaArray + m_totalModelSize, - m_deltaArray, - std::bind1st(std::multiplies(), factor)); + threadTimer.Restart(); + // lr decay + std::transform(m_deltaArray, + m_deltaArray + m_totalModelSize, + m_deltaArray, + std::bind1st(std::multiplies(), factor)); - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); - threadTimer.Stop(); - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); - } + threadTimer.Stop(); + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); + } - threadTimer.Restart(); - // copy parameters from CPU buffer to GPU buffer - for (int widx = 0; widx < m_tableCount; widx++) - { - ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; + threadTimer.Restart(); + // copy parameters from CPU buffer to GPU buffer + for (int widx = 0; widx < m_tableCount; widx++) + { + ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx]; - CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), - py, - m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), - cudaMemcpyHostToDevice, - _commStream)); - } - CudaErrorCheck(cudaStreamSynchronize(_commStream)); - threadTimer.Stop(); - if (m_traceLevel > 3) - { - double time = threadTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); - } - }); + CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(), + py, + m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType), + cudaMemcpyHostToDevice, + _commStream)); + } + CudaErrorCheck(cudaStreamSynchronize(_commStream)); + threadTimer.Stop(); + if (m_traceLevel > 3) + { + double time = threadTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); + } + }); #else - m_aysncBufferThread = new thread([&]() - { - float factor = DecayCoefficient(); - int t_cacheIdx = m_bufferIndexInUse; + m_aysncBufferThread = new thread([&]() + { + float factor = DecayCoefficient(); + int t_cacheIdx = m_bufferIndexInUse; - std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[t_cacheIdx]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[t_cacheIdx]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); - }); + }); #endif - } - else - { - m_reportTimer.Restart(); - float factor = DecayCoefficient(); - i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); + } + else + { + m_reportTimer.Restart(); + float factor = DecayCoefficient(); + i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); - ElemType * px = m_deltaArray + m_tableOffsets[i]; - mat.CopyToArray(px, m_tableLength[i]); - } + ElemType * px = m_deltaArray + m_tableOffsets[i]; + mat.CopyToArray(px, m_tableLength[i]); + } - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); - } - std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time); + } + std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus()); - // lr decay - if (m_ModelAveragingSGDSimulating) - { - factor = ModelAggregationCoefficient(sampleSinceLastSynced); - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); - if (m_traceLevel > 2 && m_syncPerfStats != 0) - { - if (m_parameterSyncCounter % m_syncPerfStats == 0) - ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport); - else - m_sampleSinceLastReport += sampleSinceLastSynced; - } + // lr decay + if (m_ModelAveragingSGDSimulating) + { + factor = ModelAggregationCoefficient(sampleSinceLastSynced); + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + if (m_traceLevel > 2 && m_syncPerfStats != 0) + { + if (m_parameterSyncCounter % m_syncPerfStats == 0) + ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport); + else + m_sampleSinceLastReport += sampleSinceLastSynced; + } - } - else - { - std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); - } - m_reportTimer.Restart(); + } + else + { + std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies(), factor)); + } + m_reportTimer.Restart(); - ElemType* px = m_deltaArray; - ElemType* py = m_cpuAsyncBuffer[0]; - m_workerArray->AddAsync(px, m_totalModelSize); - m_workerArray->Get(py, m_totalModelSize); + ElemType* px = m_deltaArray; + ElemType* py = m_cpuAsyncBuffer[0]; + m_workerArray->AddAsync(px, m_totalModelSize); + m_workerArray->Get(py, m_totalModelSize); - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); - } - m_reportTimer.Restart(); - i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Microsoft::MSR::CNTK::Matrix &mat = node->Value(); + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time); + } + m_reportTimer.Restart(); + i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Microsoft::MSR::CNTK::Matrix &mat = node->Value(); - ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; - mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px); - } - m_reportTimer.Stop(); - if (m_traceLevel > 3) - { - double time = m_reportTimer.ElapsedSeconds(); - fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); - } - } - return true; - } + ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i]; + mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px); + } + m_reportTimer.Stop(); + if (m_traceLevel > 3) + { + double time = m_reportTimer.ElapsedSeconds(); + fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time); + } + } + return true; + } - void WaitAll() override - { - multiverso::MV_Barrier(); - } + void WaitAll() override + { + multiverso::MV_Barrier(); + } - void WaitAsyncBuffer() override - { - if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) - { - m_aysncBufferThread->join(); - delete m_aysncBufferThread; - m_aysncBufferThread = nullptr; - } - } + void WaitAsyncBuffer() override + { + if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable()) + { + m_aysncBufferThread->join(); + delete m_aysncBufferThread; + m_aysncBufferThread = nullptr; + } + } private: - void MultiversoInit(const std::list & learnableNodes) - { - assert(!m_isInitialized); - m_isInitialized = true; + void MultiversoInit(const std::list & learnableNodes) + { + assert(!m_isInitialized); + m_isInitialized = true; - // parameter server offer vary of updaters, we only use the SGD updater for this simple case. - multiverso::SetCMDFlag(std::string("updater_type"), std::string("sgd")); - multiverso::MV_Init(); + // parameter server offer vary of updaters, we only use the SGD updater for this simple case. + multiverso::SetCMDFlag(std::string("updater_type"), std::string("sgd")); + multiverso::MV_Init(); - int i = 0; - for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) - { - ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); - Matrix &mat = node->Value(); - size_t layerSize = mat.GetNumElements(); + int i = 0; + for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++) + { + ComputationNodePtr node = dynamic_pointer_cast>(*nodeIter); + Matrix &mat = node->Value(); + size_t layerSize = mat.GetNumElements(); - m_tableLength.push_back(layerSize); - } + m_tableLength.push_back(layerSize); + } - m_tableCount = m_tableLength.size(); + m_tableCount = m_tableLength.size(); - // cacluate total of learnable node's size - m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0); + // cacluate total of learnable node's size + m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0); - multiverso::MV_Barrier(); + multiverso::MV_Barrier(); - size_t idx = 0; - for (size_t len : m_tableLength) - { - m_tableOffsets.push_back(idx); - idx += len; - } + size_t idx = 0; + for (size_t len : m_tableLength) + { + m_tableOffsets.push_back(idx); + idx += len; + } #ifndef CPUONLY - for (int i = 0; i < m_localBufferNum; i++) - m_gpuAsyncBuffer[i].reserve(m_tableCount); + for (int i = 0; i < m_localBufferNum; i++) + m_gpuAsyncBuffer[i].reserve(m_tableCount); - // create pinned memory - for (int i = 0; i < m_localBufferNum; ++i) - CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); + // create pinned memory + for (int i = 0; i < m_localBufferNum; ++i) + CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); - CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); + CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable)); #else - for (int i = 0; i < m_localBufferNum; i++) - m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize]; + for (int i = 0; i < m_localBufferNum; i++) + m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize]; #endif - } + } - float DecayCoefficient() - { - float f = 1.f; - switch (m_adjustLearningRateAtBeginningType) - { - case AdjustLearningRateAtBeginning::None: - break; - case AdjustLearningRateAtBeginning::Linearly: - f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter))); - break; - case AdjustLearningRateAtBeginning::Staircase: - f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1)))); - break; - default: - break; - } - return f; - } + float DecayCoefficient() + { + float f = 1.f; + switch (m_adjustLearningRateAtBeginningType) + { + case AdjustLearningRateAtBeginning::None: + break; + case AdjustLearningRateAtBeginning::Linearly: + f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter))); + break; + case AdjustLearningRateAtBeginning::Staircase: + f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1)))); + break; + default: + break; + } + return f; + } - float ModelAggregationCoefficient(size_t samplesSinceLastSync) - { - float factor = 0; - int nTotalSamples = samplesSinceLastSync; - // TODO[qiwye] will conflict with multiverso - // m_pMPI->AllReduce(&nTotalSamples, 1); + float ModelAggregationCoefficient(size_t samplesSinceLastSync) + { + float factor = 0; + int nTotalSamples = samplesSinceLastSync; + // TODO[qiwye] will conflict with multiverso + // m_pMPI->AllReduce(&nTotalSamples, 1); - if (nTotalSamples <= 0) - { - factor = 1.0f / m_pMPI->NumNodesInUse(); - // give an estimated one - } - else - { - factor = (samplesSinceLastSync + 0.0f) / nTotalSamples; - } - factor = 1.0f / m_pMPI->NumNodesInUse(); - return factor; - } + if (nTotalSamples <= 0) + { + factor = 1.0f / m_pMPI->NumNodesInUse(); + // give an estimated one + } + else + { + factor = (samplesSinceLastSync + 0.0f) / nTotalSamples; + } + factor = 1.0f / m_pMPI->NumNodesInUse(); + return factor; + } - inline void transpose(ElemType *src, ElemType *dst, const int N, const int M) - { - for (auto n = 0; n < N*M; n++) { - auto i = n / N; - auto j = n%N; - dst[n] = src[M*j + i]; - } - } + inline void transpose(ElemType *src, ElemType *dst, const int N, const int M) + { + for (auto n = 0; n < N*M; n++) { + auto i = n / N; + auto j = n%N; + dst[n] = src[M*j + i]; + } + } - void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport, - size_t localSamplesProcessedSinceLastReport) - { - m_reportTimer.Stop(); - double secondsSinceLastReport = m_reportTimer.ElapsedSeconds(); - m_reportTimer.Restart(); + void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport, + size_t localSamplesProcessedSinceLastReport) + { + m_reportTimer.Stop(); + double secondsSinceLastReport = m_reportTimer.ElapsedSeconds(); + m_reportTimer.Restart(); - float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f; - float throughputPerWorker = totalThroughput / m_totalClientNumber; + float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f; + float throughputPerWorker = totalThroughput / m_totalClientNumber; - string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n" - "\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n"; - fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport, - (int)m_parameterSyncCounter, totalThroughput, throughputPerWorker); - m_sampleSinceLastReport = 0; + string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n" + "\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n"; + fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport, + (int)m_parameterSyncCounter, totalThroughput, throughputPerWorker); + m_sampleSinceLastReport = 0; - } + } - multiverso::ArrayServer* m_serverArray; - multiverso::ArrayWorker* m_workerArray; + multiverso::ArrayServer* m_serverArray; + multiverso::ArrayWorker* m_workerArray; - thread * m_aysncBufferThread; - bool m_isInitialized; - bool m_doesEveryNodesShouldSynced; - bool m_ModelAveragingSGDSimulating; + thread * m_aysncBufferThread; + bool m_isInitialized; + bool m_doesEveryNodesShouldSynced; + bool m_ModelAveragingSGDSimulating; - int m_totalClientNumber; - int m_traceLevel; - int m_syncPerfStats; - Timer m_reportTimer; - size_t m_parameterSyncCounter; - size_t m_sampleSinceLastReport; + int m_totalClientNumber; + int m_traceLevel; + int m_syncPerfStats; + Timer m_reportTimer; + size_t m_parameterSyncCounter; + size_t m_sampleSinceLastReport; - bool m_useAsyncBuffered; - int m_localBufferNum; - int * m_bufferSwapIndex; - int m_bufferIndexInUse; - std::vector m_getOptions; // used by sparse table - std::vector m_addOptions; // used by sparse table + bool m_useAsyncBuffered; + int m_localBufferNum; + int * m_bufferSwapIndex; + int m_bufferIndexInUse; + std::vector m_getOptions; // used by sparse table + std::vector m_addOptions; // used by sparse table - AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType; - double m_adjustCoefficient; - size_t m_adjustMBNumber; + AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType; + double m_adjustCoefficient; + size_t m_adjustMBNumber; - vector m_tableLength; - size_t m_totalModelSize; - vector m_tableOffsets; - //shared_ptr m_deltaArray; - ElemType * m_deltaArray; - //std::vector > m_cpuAsyncBuffer; - ElemType ** m_cpuAsyncBuffer; + vector m_tableLength; + size_t m_totalModelSize; + vector m_tableOffsets; + //shared_ptr m_deltaArray; + ElemType * m_deltaArray; + //std::vector > m_cpuAsyncBuffer; + ElemType ** m_cpuAsyncBuffer; - MPIWrapperPtr m_pMPI; + MPIWrapperPtr m_pMPI; - // GPU double buffer - std::vector >> m_gpuAsyncBuffer; - int m_tableCount; + // GPU double buffer + std::vector >> m_gpuAsyncBuffer; + int m_tableCount; #ifndef CPUONLY - cudaStream_t _commStream; + cudaStream_t _commStream; #endif }; // Class MultiversoHelper