Merge branch 'qiwye/asgd-dev' of https://github.com/Microsoft/CNTK into qiwye/asgd-dev

This commit is contained in:
Qiwei Ye 2016-11-01 20:42:49 +08:00
Родитель f8f66384c3 10428b12fb
Коммит 5f8e9c6385
10 изменённых файлов: 719 добавлений и 843 удалений

Просмотреть файл

@ -483,6 +483,7 @@ SGDLIB_SRC=\
$(SOURCEDIR)/SGDLib/Profiler.cpp \
$(SOURCEDIR)/SGDLib/SGD.cpp \
$(SOURCEDIR)/SGDLib/PostComputingActions.cpp \
$(SOURCEDIR)/SGDLib/ASGDHelper.cpp \
SGDLIB_SRC+=$(CNTKLIBRARY_COMMON_SRC)

Просмотреть файл

@ -1,29 +0,0 @@
//
// <copyright file="ASGDCommon.h" company="Microsoft">
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// </copyright>
//
#pragma once
namespace Microsoft { namespace MSR { namespace CNTK {
// -----------------------------------------------------------------------
// class AdjustLearningRateAtBeginning
// Providing option for DataParallelASGD training. so that every nodes
// could adjust learning rate every minibatch at first N epochs.
// -----------------------------------------------------------------------
// TODO: We can removed these options once we can adjust learning rate at minibatchs level
enum class AdjustLearningRateAtBeginning : int
{
None = 0, // default, don't adjust learning rate
Linearly = 1, // using linear adjustment, learning rate will from 0 to learningRatesPerMB
Staircase = (1 << 1), // using staircased adjustment, learning rate will from 0 to learningRatesPerMB every adjustNbMinibatch
};
}}}

Просмотреть файл

@ -0,0 +1,69 @@
//
// <copyright file="ASGDHelper.h" company="Microsoft">
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>
//
#pragma once
#include <list>
#include "MPIWrapper.h"
#include "ComputationNetwork.h"
namespace Microsoft { namespace MSR { namespace CNTK {
// -----------------------------------------------------------------------
// class AdjustLearningRateAtBeginning
// Providing option for DataParallelASGD training. so that every nodes
// could adjust learning rate every minibatch at first N epochs.
// -----------------------------------------------------------------------
// TODO: We can removed these options once we can adjust learning rate at minibatchs level
enum class AdjustLearningRateAtBeginning : int
{
None = 0, // default, don't adjust learning rate
Linearly = 1, // using linear adjustment, learning rate will from 0 to learningRatesPerMB
Staircase = (1 << 1), // using staircased adjustment, learning rate will from 0 to learningRatesPerMB every adjustNbMinibatch
};
template<class ElemType = float>
class ASGDHelper
{
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
public:
virtual ~ASGDHelper() { }
// -----------------------------------------------------------------------
// InitModel() -- Upload initilized model (, which was pre-computed by CNTK logic) .
// to the parameter servers, so that every node could start training from same model
// -----------------------------------------------------------------------
virtual void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes) = 0;
// -----------------------------------------------------------------------
// PushAndPullModel() -- Push parameters of learnableNodes to parameter servers, then get the latests model back.
// -----------------------------------------------------------------------
virtual bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced = 0) = 0;
// -----------------------------------------------------------------------
// WaitAll() -- Wait(Barrier) all the other nodes to process
// -----------------------------------------------------------------------
virtual void WaitAll() = 0;
virtual void WaitAsyncBuffer() = 0;
}; // Class ASGDHelper
// Factory method to create a ASGDHelper instance
template<class ElemType = float>
ASGDHelper<ElemType>* NewASGDHelper(
const std::list<ComputationNodeBasePtr> & learnableNodes, // Parameters that needs to be train
size_t nodeNumRanks, // Number of working nodes
bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost
bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD
AdjustLearningRateAtBeginning adjusttype =
AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process
double adjustCoef = 0.2, // see in DecayCoefficient()
size_t adjustPerMinibatches = 600, //
int traceLevel = 0, // log level
int syncPerfStats = 0, // shown perf data every syncPerfStats
const MPIWrapperPtr& pMPI = nullptr);
}}}

Просмотреть файл

@ -1,737 +0,0 @@
//
// <copyright file="MultiversoWrapper.h" company="Microsoft">
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>
//
#pragma once
// the header files located in Source\Multiverso\include
#include <multiverso/multiverso.h>
#include <multiverso/util/log.h>
#include <multiverso/util/configure.h>
#ifdef MULTIVERSO_USE_MATRIXTABLE
#include <multiverso/table/matrix.h>
#else
#include <multiverso/table/array_table.h>
#endif
#include <multiverso/updater/updater.h>
#pragma comment(lib, "Multiverso.lib")
#ifndef CPUONLY
#include <cuda_runtime.h>
#pragma comment (lib, "cudart.lib") // for cudaMemcpyAsync()
#endif
// TODO: test for the model aggregation
#include "MPIWrapper.h"
#include "ComputationNetwork.h"
#include "TimerUtility.h"
#include "ASGDCommon.h"
#include <functional>
#include <thread>
#include <unordered_map>
#include <numeric>
#include <algorithm>
namespace Microsoft { namespace MSR { namespace CNTK {
// #define MULTIVERSO_DEBUG
#ifndef CPUONLY
#define CudaErrorCheck(ans) { gpuAssert((ans), __FILE__, __LINE__); }
inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true)
{
if (code != cudaSuccess)
{
fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line);
if (abort) exit(code);
}
}
#endif
template<class ElemType = float>
class MultiversoHelper
{
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
public:
// -----------------------------------------------------------------------
// MultiversoHelper() -- Construct function for MultiversoHelper.
// -----------------------------------------------------------------------
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes, // Parameters that needs to be train
size_t nodeNumRanks, // Number of working nodes
bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost
bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD
AdjustLearningRateAtBeginning adjusttype =
AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process
// this could be used to tackle the unstableness of ASGD
double adjustCoef = 0.2, // see in DecayCoefficient()
size_t adjustPerMinibatches = 600, //
int traceLevel = 0, // log level
int syncPerfStats = 0, // shown perf data every syncPerfStats
const MPIWrapperPtr& pMPI = nullptr)
: m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype),
m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches),
m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered),
m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false),
m_pMPI(pMPI), m_syncPerfStats(syncPerfStats)
{
if (m_ModelAveragingSGDSimulating)
{
m_doesEveryNodesShouldSynced = true;
m_useAsyncBuffered = false;
}
// Pipeline releated variables
m_localBufferNum = m_useAsyncBuffered ? 2 : 1;
m_bufferSwapIndex = new int[m_localBufferNum];
// CPU asynchronous buffer
m_cpuAsyncBuffer = new ElemType*[m_localBufferNum];
// Get option used by multiverso sparse update
m_getOptions.reserve(m_localBufferNum);
m_addOptions.reserve(m_localBufferNum);
#ifndef CPUONLY
// GPU asynchronous buffer
m_gpuAsyncBuffer.resize(m_localBufferNum);
// creat an communication stream for the data tranfer between GPU and CPU
CudaErrorCheck(cudaStreamCreate(&_commStream));
#endif
m_bufferIndexInUse = 0;
for (int i = 0; i < m_localBufferNum; i++)
m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum;
m_aysncBufferThread = nullptr;
if (m_traceLevel > 5)
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug);
else if (m_traceLevel > 4)
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error);
if (m_doesEveryNodesShouldSynced)
multiverso::SetCMDFlag("sync", true);
MultiversoInit(learnableNodes);
}
// -----------------------------------------------------------------------
// ~MultiversoHelper() -- Destruct function for MultiversoHelper.
// -----------------------------------------------------------------------
~MultiversoHelper()
{
fprintf(stderr, "~MultiversoHelper\n");
fflush(stderr);
if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable())
m_aysncBufferThread->join();
delete m_bufferSwapIndex, m_deltaArray;
for (size_t i = 0; i < m_localBufferNum; i++)
{
#ifndef CPUONLY
CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i]));
#else
delete m_cpuAsyncBuffer[i];
#endif
}
delete m_cpuAsyncBuffer;
#ifndef CPUONLY
CudaErrorCheck(cudaStreamDestroy(_commStream));
#endif
multiverso::MV_ShutDown(false);
}
// -----------------------------------------------------------------------
// InitModel() -- Upload initilized model (, which was pre-computed by CNTK logic) .
// to the parameter servers, so that every node could start training from same model
// -----------------------------------------------------------------------
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
{
float factor = 1.0f / m_totalClientNumber;
int i = 0; // indicate the index of learnable nodes
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY
for (int j = 0; j < m_localBufferNum; j++)
m_gpuAsyncBuffer[j].push_back(mat.DeepClone());
#endif
ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
}
for (int i = 1; i < m_localBufferNum; i++)
memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize);
memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize);
// because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server.
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), -factor));
// Using matrix_table on multiverso could make CNTK sync with parameter server layer by layer possable,
// but it will also slower about 10% than array_table.
#ifdef MULTIVERSO_USE_MATRIXTABLE
for (int widx = 0; widx < m_tableCount; widx++)
{
if (m_isSparseArray[widx])
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[0]);
multiversoMatrix->Get(px, m_tableLength[widx], m_getOptions[0]);
WaitAll();
multiversoMatrix->Get(px, m_tableLength[widx], m_getOptions[0]);
}
else
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx]);
multiversoMatrix->Get(px, m_tableLength[widx]);
WaitAll();
multiversoMatrix->Get(px, m_tableLength[widx]);
}
}
#else
m_workerArray->Add(m_deltaArray, m_totalModelSize);
m_workerArray->Get(m_deltaArray, m_totalModelSize);
WaitAll();
m_workerArray->Get(m_deltaArray, m_totalModelSize);
#endif
if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0]))
multiverso::Log::Info("multiverso initial model loaded.\n");
m_reportTimer.Start();
}
// -----------------------------------------------------------------------
// PushAndPullModel() -- Push parameters of learnableNodes to parameter servers, then get the latests model back.
// -----------------------------------------------------------------------
bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced = 0)
{
m_parameterSyncCounter++;
double fromCPUToGPUTime;
double fromGPUToCPUTime;
double networkTime;
double swapTimeOnGPU;
m_reportTimer.Restart();
WaitAsyncBuffer();
m_reportTimer.Stop();
// reset statics for profiling
if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0)
{
fromCPUToGPUTime = 0;
fromGPUToCPUTime = 0;
networkTime = 0;
swapTimeOnGPU = 0;
}
m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse];
int i = 0; // indicate the index of learnable nodes
if (m_useAsyncBuffered)
{
m_reportTimer.Restart();
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY
// CNTK model -> GPU buffer
CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(),
mat.Data(),
mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice));
// GPU buffer -> CNTK model
CudaErrorCheck(cudaMemcpy(mat.Data(),
m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(),
mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice));
#else
ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i];
mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py);
delete px;
#endif
}
m_reportTimer.Stop();
if (m_traceLevel > 2)
{
swapTimeOnGPU = m_reportTimer.ElapsedSeconds();
}
#ifndef CPUONLY
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId();
CudaErrorCheck(cudaSetDevice(deviceId));
Timer threadTimer;
threadTimer.Restart();
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * px = m_deltaArray + m_tableOffsets[widx];
// GPU buffer -> CPU buffer
CudaErrorCheck(cudaMemcpyAsync(px,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToHost,
_commStream));
}
// waiting copy from GPU to CPU has finished
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time);
}
// delta = gradient * learning_rate
std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse],
m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize,
m_deltaArray, m_deltaArray,
std::minus<ElemType>());
threadTimer.Restart();
// lr decay
std::transform(m_deltaArray,
m_deltaArray + m_totalModelSize,
m_deltaArray,
std::bind1st(std::multiplies<ElemType>(), factor));
#ifdef MULTIVERSO_USE_MATRIXTABLE
for (int widx = 0; widx < m_tableCount; widx++)
{
if (m_isSparseArray[widx])
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[m_bufferIndexInUse]);
multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[m_bufferIndexInUse]);
}
else
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx]);
multiversoMatrix->Get(py, m_tableLength[widx]);
}
}
#else
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
#endif
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
threadTimer.Restart();
// copy parameters from CPU buffer to GPU buffer
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
py,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyHostToDevice,
_commStream));
}
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
});
#else
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int t_cacheIdx = m_bufferIndexInUse;
std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus<ElemType>());
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
#ifdef MULTIVERSO_USE_MATRIXTABLEM
for (int widx = 0; widx < m_tableCount; widx++)
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
ElemType* py = m_cpuAsyncBuffer[t_cacheIdx] + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[t_cacheIdx]);
multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[t_cacheIdx]);
}
#else
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[t_cacheIdx];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
#endif
});
#endif
}
else
{
m_reportTimer.Restart();
float factor = DecayCoefficient();
i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
ElemType * px = m_deltaArray + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
}
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time);
}
std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus<ElemType>());
// lr decay
if (m_ModelAveragingSGDSimulating)
{
factor = ModelAggregationCoefficient(sampleSinceLastSynced);
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
if (m_traceLevel > 2 && m_syncPerfStats != 0)
{
if (m_parameterSyncCounter % m_syncPerfStats == 0)
ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport);
else
m_sampleSinceLastReport += sampleSinceLastSynced;
}
}
else
{
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
}
m_reportTimer.Restart();
#ifdef MULTIVERSO_USE_MATRIXTABLE
for (int widx = 0; widx < m_tableCount; widx++)
{
if (m_isSparseArray[widx])
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
ElemType* py = m_cpuAsyncBuffer[0] + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx], m_addOptions[0]);
multiversoMatrix->Get(py, m_tableLength[widx], m_getOptions[0]);
}
else
{
auto multiversoMatrix = m_matrixMap->at(widx);
ElemType* px = m_deltaArray + m_tableOffsets[widx];
ElemType* py = m_cpuAsyncBuffer[0] + m_tableOffsets[widx];
multiversoMatrix->Add(px, m_tableLength[widx]);
multiversoMatrix->Get(py, m_tableLength[widx]);
}
}
#else
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[0];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
#endif
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
m_reportTimer.Restart();
i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px);
}
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
}
return true;
}
// -----------------------------------------------------------------------
// PushModel() -- Push parameters of learnableNodes to parameter servers
// -----------------------------------------------------------------------
void PushModel(const std::list<ComputationNodeBasePtr> & learnableNode)
{
}
// -----------------------------------------------------------------------
// PullModel() -- Pull parameters of learnableNodes from parameter servers
// -----------------------------------------------------------------------
void PullModel(const std::list<ComputationNodeBasePtr> & learnableNode)
{
}
// -----------------------------------------------------------------------
// WaitAll() -- Wait(Barrier) all the other nodes to process
// -----------------------------------------------------------------------
void WaitAll()
{
multiverso::MV_Barrier();
}
void WaitAsyncBuffer()
{
if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable())
{
m_aysncBufferThread->join();
delete m_aysncBufferThread;
m_aysncBufferThread = nullptr;
}
}
private:
// -----------------------------------------------------------------------
// WaitAll() -- Wait(Barrier) all the other nodes to process
// -----------------------------------------------------------------------
void MultiversoInit(const std::list<ComputationNodeBasePtr> & learnableNodes)
{
assert(!m_isInitialized);
m_isInitialized = true;
// parameter server offer vary of updaters, we only use the SGD updater for this simple case.
multiverso::SetCMDFlag<std::string>(std::string("updater_type"), std::string("sgd"));
multiverso::MV_Init();
#ifdef MULTIVERSO_USE_MATRIXTABLE
for (int i = 0; i < m_localBufferNum; i++)
{
m_getOptions.push_back(new multiverso::GetOption());
m_getOptions.at(i)->set_worker_id(m_localBufferNum * multiverso::MV_WorkerId() + i);
m_addOptions.push_back(new multiverso::AddOption());
m_addOptions.at(i)->set_worker_id(m_localBufferNum * multiverso::MV_WorkerId() + i);
}
m_matrixMap = new std::vector< multiverso::MatrixWorker<ElemType>*>();
m_serverMap = new std::vector< multiverso::MatrixServer<ElemType>*>();
// weights
std::wstring sparse_tag{ L"Sparse" };
#endif
int i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Matrix<ElemType> &mat = node->Value();
size_t layerSize = mat.GetNumElements();
#ifdef MULTIVERSO_USE_MATRIXTABLE
size_t layerRowSize = mat.GetNumRows();
size_t layerColSize = mat.GetNumCols();
std::wstring nodeName = node->NodeName();
auto found = nodeName.find(sparse_tag);
m_isSparseArray.push_back(false);
if (found != std::string::npos)
{
m_isSparseArray[i] = true;
fprintf(stderr, "Layer %ls using sparseMatrix. row size: %d, col size: %d\n", nodeName.c_str(), (int)layerColSize, (int)layerRowSize);
fflush(stderr);
m_matrixMap->push_back(new multiverso::MatrixWorker<ElemType>(layerColSize, layerRowSize, true));
m_serverMap->push_back(new multiverso::MatrixServer<ElemType>(layerColSize, layerRowSize, true, m_useAsyncBuffered));
}
else
{
m_isSparseArray[i] = false;
m_matrixMap->push_back(new multiverso::MatrixWorker<ElemType>(layerRowSize, layerColSize, false));
m_serverMap->push_back(new multiverso::MatrixServer<ElemType>(layerRowSize, layerColSize, false, m_useAsyncBuffered));
}
#endif
m_tableLength.push_back(layerSize);
}
m_tableCount = m_tableLength.size();
// cacluate total of learnable node's size
m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0);
#ifndef MULTIVERSO_USE_MATRIXTABLE
m_serverArray = new multiverso::ArrayServer<ElemType>(m_totalModelSize);
m_workerArray = new multiverso::ArrayWorker<ElemType>(m_totalModelSize);
#endif
multiverso::MV_Barrier();
size_t idx = 0;
for (size_t len : m_tableLength)
{
m_tableOffsets.push_back(idx);
idx += len;
}
#ifndef CPUONLY
for (int i = 0; i < m_localBufferNum; i++)
m_gpuAsyncBuffer[i].reserve(m_tableCount);
// create pinned memory
for (int i = 0; i < m_localBufferNum; ++i)
CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
#else
for (int i = 0; i < m_localBufferNum; i++)
m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize];
#endif
}
float DecayCoefficient()
{
float f = 1.f;
switch (m_adjustLearningRateAtBeginningType)
{
case AdjustLearningRateAtBeginning::None:
break;
case AdjustLearningRateAtBeginning::Linearly:
f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter)));
break;
case AdjustLearningRateAtBeginning::Staircase:
f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1))));
break;
default:
break;
}
return f;
}
float ModelAggregationCoefficient(size_t samplesSinceLastSync)
{
float factor = 0;
int nTotalSamples = samplesSinceLastSync;
// TODO[qiwye] will conflict with multiverso
// m_pMPI->AllReduce(&nTotalSamples, 1);
if (nTotalSamples <= 0)
{
factor = 1.0f / m_pMPI->NumNodesInUse();
// give an estimated one
}
else
{
factor = (samplesSinceLastSync + 0.0f) / nTotalSamples;
}
factor = 1.0f / m_pMPI->NumNodesInUse();
return factor;
}
inline void transpose(ElemType *src, ElemType *dst, const int N, const int M)
{
for (auto n = 0; n < N*M; n++) {
auto i = n / N;
auto j = n%N;
dst[n] = src[M*j + i];
}
}
void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport,
size_t localSamplesProcessedSinceLastReport)
{
m_reportTimer.Stop();
double secondsSinceLastReport = m_reportTimer.ElapsedSeconds();
m_reportTimer.Restart();
float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f;
float throughputPerWorker = totalThroughput / m_totalClientNumber;
string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n"
"\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n";
fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport,
(int)m_parameterSyncCounter, totalThroughput, throughputPerWorker);
m_sampleSinceLastReport = 0;
}
#ifdef MULTIVERSO_USE_MATRIXTABLE
std::vector<multiverso::MatrixWorker<ElemType>*>* m_matrixMap;
std::vector<multiverso::MatrixServer<ElemType>*>* m_serverMap;
std::vector<bool> m_isSparseArray;
// Todo(qiwye): using ArrayTable for less comunications between servers and workers.
#else
multiverso::ArrayServer<ElemType>* m_serverArray;
multiverso::ArrayWorker<ElemType>* m_workerArray;
#endif
thread * m_aysncBufferThread;
bool m_isInitialized;
bool m_doesEveryNodesShouldSynced;
bool m_ModelAveragingSGDSimulating;
int m_totalClientNumber;
int m_traceLevel;
int m_syncPerfStats;
Timer m_reportTimer;
size_t m_parameterSyncCounter;
size_t m_sampleSinceLastReport;
bool m_useAsyncBuffered;
int m_localBufferNum;
int * m_bufferSwapIndex;
int m_bufferIndexInUse;
std::vector< multiverso::GetOption*> m_getOptions; // used by sparse table
std::vector< multiverso::AddOption*> m_addOptions; // used by sparse table
AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType;
double m_adjustCoefficient;
size_t m_adjustMBNumber;
vector<size_t> m_tableLength;
size_t m_totalModelSize;
vector<size_t> m_tableOffsets;
//shared_ptr<ElemType> m_deltaArray;
ElemType * m_deltaArray;
//std::vector<shared_ptr<ElemType> > m_cpuAsyncBuffer;
ElemType ** m_cpuAsyncBuffer;
MPIWrapperPtr m_pMPI;
// GPU double buffer
std::vector<std::vector<Matrix<ElemType> >> m_gpuAsyncBuffer;
int m_tableCount;
#ifndef CPUONLY
cudaStream_t _commStream;
#endif
};
}}}

Просмотреть файл

@ -1,46 +0,0 @@
//
// <copyright file="NoMultiversoWrapper.h" company="Microsoft">
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// </copyright>
//
#pragma once
#include "ASGDCommon.h"
namespace Microsoft { namespace MSR { namespace CNTK {
template<class ElemType = float>
class MultiversoHelper
{
public:
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int nodeNumRanks,
bool useAsyncBuffered = true,
bool isSimModelAveragingSGD = false,
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
double adjustcoef = 0.2,
size_t adjustnbmb = 600,
int traceLevel = 0,
int syncPerfStats = 0,
const MPIWrapperPtr& pMPI = nullptr) { }
~MultiversoHelper() { }
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNode) { }
void PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced = 0) { }
void PushModel(const std::list<ComputationNodeBasePtr> & learnableNode) { }
void PullModel(const std::list<ComputationNodeBasePtr> & learnableNode) { }
void WaitAll() { }
void WaitAsyncBuffer() { }
};
}
}
}

Просмотреть файл

@ -0,0 +1,631 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// ASGDHelper.cpp : Implements ASGDHelper interface. The implementation is based on Multiverso.
//
#include "ASGDHelper.h"
#include "MPIWrapper.h"
#include "ComputationNetwork.h"
#include "TimerUtility.h"
#include "ASGDCommon.h"
#include <functional>
#include <thread>
#include <unordered_map>
#include <numeric>
#include <algorithm>
#ifdef ASGD_PARALLEL_SUPPORT
#include <multiverso/multiverso.h>
#include <multiverso/util/log.h>
#include <multiverso/util/configure.h>
#include <multiverso/table/array_table.h>
#include <multiverso/updater/updater.h>
#pragma comment(lib, "Multiverso.lib")
#endif
#ifndef CPUONLY
#include <cuda_runtime.h>
#pragma comment (lib, "cudart.lib") // for cudaMemcpyAsync()
#endif
namespace Microsoft { namespace MSR { namespace CNTK {
#ifndef CPUONLY
#define CudaErrorCheck(ans) { gpuAssert((ans), __FILE__, __LINE__); }
inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true)
{
if (code != cudaSuccess)
{
fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line);
if (abort) exit(code);
}
}
#endif
#ifdef ASGD_PARALLEL_SUPPORT
// MultiversoHelper is the implementation of ASGDHelper interface with Multiverso
template<class ElemType = float>
class MultiversoHelper : public ASGDHelper<ElemType>
{
public:
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes, // Parameters that needs to be train
size_t nodeNumRanks, // Number of working nodes
bool useAsyncBuffered = true, // Using asynchonous buffer to hide communication cost
bool isSimulatedModelAveragingSGD = false, // Using parameter server-based MA rather than ASGD
AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None, // Adjust learning per minibatches at very begining of training process
// this could be used to tackle the unstableness of ASGD
double adjustCoef = 0.2, // see in DecayCoefficient()
size_t adjustPerMinibatches = 600, //
int traceLevel = 0, // log level
int syncPerfStats = 0, // shown perf data every syncPerfStats
const MPIWrapperPtr& pMPI = nullptr) :
m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype),
m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches),
m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered),
m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false),
m_pMPI(pMPI), m_syncPerfStats(syncPerfStats)
{
if (m_ModelAveragingSGDSimulating)
{
m_doesEveryNodesShouldSynced = true;
m_useAsyncBuffered = false;
}
// Pipeline releated variables
m_localBufferNum = m_useAsyncBuffered ? 2 : 1;
m_bufferSwapIndex = new int[m_localBufferNum];
// CPU asynchronous buffer
m_cpuAsyncBuffer = new ElemType*[m_localBufferNum];
// Get option used by multiverso sparse update
m_getOptions.reserve(m_localBufferNum);
m_addOptions.reserve(m_localBufferNum);
#ifndef CPUONLY
// GPU asynchronous buffer
m_gpuAsyncBuffer.resize(m_localBufferNum);
// creat an communication stream for the data tranfer between GPU and CPU
CudaErrorCheck(cudaStreamCreate(&_commStream));
#endif
m_bufferIndexInUse = 0;
for (int i = 0; i < m_localBufferNum; i++)
m_bufferSwapIndex[i] = (i + 1) % m_localBufferNum;
m_aysncBufferThread = nullptr;
if (m_traceLevel > 5)
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug);
else if (m_traceLevel > 4)
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error);
if (m_doesEveryNodesShouldSynced)
multiverso::SetCMDFlag("sync", true);
MultiversoInit(learnableNodes);
}
~MultiversoHelper()
{
fprintf(stderr, "~MultiversoHelper\n");
fflush(stderr);
if (m_useAsyncBuffered && m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable())
m_aysncBufferThread->join();
delete m_bufferSwapIndex, m_deltaArray;
for (size_t i = 0; i < m_localBufferNum; i++)
{
#ifndef CPUONLY
CudaErrorCheck(cudaFreeHost(m_cpuAsyncBuffer[i]));
#else
delete m_cpuAsyncBuffer[i];
#endif
}
delete m_cpuAsyncBuffer;
#ifndef CPUONLY
CudaErrorCheck(cudaStreamDestroy(_commStream));
#endif
multiverso::MV_ShutDown(false);
}
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes) override
{
float factor = 1.0f / m_totalClientNumber;
int i = 0; // indicate the index of learnable nodes
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY
for (int j = 0; j < m_localBufferNum; j++)
m_gpuAsyncBuffer[j].push_back(mat.DeepClone());
#endif
ElemType* px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
}
for (int i = 1; i < m_localBufferNum; i++)
memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize);
memcpy(m_deltaArray, m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize);
// because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server.
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), -factor));
m_workerArray->Add(m_deltaArray, m_totalModelSize);
m_workerArray->Get(m_deltaArray, m_totalModelSize);
WaitAll();
m_workerArray->Get(m_deltaArray, m_totalModelSize);
if (std::equal(m_deltaArray, m_deltaArray + m_totalModelSize, m_cpuAsyncBuffer[0]))
multiverso::Log::Info("multiverso initial model loaded.\n");
m_reportTimer.Start();
}
bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced) override
{
m_parameterSyncCounter++;
double fromCPUToGPUTime;
double fromGPUToCPUTime;
double networkTime;
double swapTimeOnGPU;
m_reportTimer.Restart();
WaitAsyncBuffer();
m_reportTimer.Stop();
// reset statics for profiling
if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0)
{
fromCPUToGPUTime = 0;
fromGPUToCPUTime = 0;
networkTime = 0;
swapTimeOnGPU = 0;
}
m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse];
int i = 0; // indicate the index of learnable nodes
if (m_useAsyncBuffered)
{
m_reportTimer.Restart();
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY
// CNTK model -> GPU buffer
CudaErrorCheck(cudaMemcpy(m_gpuAsyncBuffer[m_bufferIndexInUse][i].Data(),
mat.Data(),
mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice));
// GPU buffer -> CNTK model
CudaErrorCheck(cudaMemcpy(mat.Data(),
m_gpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]][i].Data(),
mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice));
#else
ElemType * px = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
ElemType * py = m_cpuAsyncBuffer[m_bufferSwapIndex[m_bufferIndexInUse]] + m_tableOffsets[i];
mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py);
delete px;
#endif
}
m_reportTimer.Stop();
if (m_traceLevel > 2)
{
swapTimeOnGPU = m_reportTimer.ElapsedSeconds();
}
#ifndef CPUONLY
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId();
CudaErrorCheck(cudaSetDevice(deviceId));
Timer threadTimer;
threadTimer.Restart();
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * px = m_deltaArray + m_tableOffsets[widx];
// GPU buffer -> CPU buffer
CudaErrorCheck(cudaMemcpyAsync(px,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToHost,
_commStream));
}
// waiting copy from GPU to CPU has finished
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time);
}
// delta = gradient * learning_rate
std::transform(m_cpuAsyncBuffer[m_bufferIndexInUse],
m_cpuAsyncBuffer[m_bufferIndexInUse] + m_totalModelSize,
m_deltaArray, m_deltaArray,
std::minus<ElemType>());
threadTimer.Restart();
// lr decay
std::transform(m_deltaArray,
m_deltaArray + m_totalModelSize,
m_deltaArray,
std::bind1st(std::multiplies<ElemType>(), factor));
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[m_bufferIndexInUse];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
threadTimer.Restart();
// copy parameters from CPU buffer to GPU buffer
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
py,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyHostToDevice,
_commStream));
}
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
});
#else
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int t_cacheIdx = m_bufferIndexInUse;
std::transform(m_cpuAsyncBuffer[t_cacheIdx], m_cpuAsyncBuffer[t_cacheIdx] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus<ElemType>());
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[t_cacheIdx];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
});
#endif
}
else
{
m_reportTimer.Restart();
float factor = DecayCoefficient();
i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
ElemType * px = m_deltaArray + m_tableOffsets[i];
mat.CopyToArray(px, m_tableLength[i]);
}
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, GPU -> CPU time %lf \n", time);
}
std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus<ElemType>());
// lr decay
if (m_ModelAveragingSGDSimulating)
{
factor = ModelAggregationCoefficient(sampleSinceLastSynced);
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
if (m_traceLevel > 2 && m_syncPerfStats != 0)
{
if (m_parameterSyncCounter % m_syncPerfStats == 0)
ReportPerfStats(m_totalClientNumber * m_sampleSinceLastReport, m_sampleSinceLastReport);
else
m_sampleSinceLastReport += sampleSinceLastSynced;
}
}
else
{
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
}
m_reportTimer.Restart();
ElemType* px = m_deltaArray;
ElemType* py = m_cpuAsyncBuffer[0];
m_workerArray->AddAsync(px, m_totalModelSize);
m_workerArray->Get(py, m_totalModelSize);
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
m_reportTimer.Restart();
i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
ElemType * px = m_cpuAsyncBuffer[0] + m_tableOffsets[i];
mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), px);
}
m_reportTimer.Stop();
if (m_traceLevel > 3)
{
double time = m_reportTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
}
return true;
}
void WaitAll() override
{
multiverso::MV_Barrier();
}
void WaitAsyncBuffer() override
{
if (m_aysncBufferThread != nullptr && m_aysncBufferThread->joinable())
{
m_aysncBufferThread->join();
delete m_aysncBufferThread;
m_aysncBufferThread = nullptr;
}
}
private:
void MultiversoInit(const std::list<ComputationNodeBasePtr> & learnableNodes)
{
assert(!m_isInitialized);
m_isInitialized = true;
// parameter server offer vary of updaters, we only use the SGD updater for this simple case.
multiverso::SetCMDFlag<std::string>(std::string("updater_type"), std::string("sgd"));
multiverso::MV_Init();
int i = 0;
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++, i++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
Matrix<ElemType> &mat = node->Value();
size_t layerSize = mat.GetNumElements();
m_tableLength.push_back(layerSize);
}
m_tableCount = m_tableLength.size();
// cacluate total of learnable node's size
m_totalModelSize = accumulate(m_tableLength.begin(), m_tableLength.end(), 0);
multiverso::MV_Barrier();
size_t idx = 0;
for (size_t len : m_tableLength)
{
m_tableOffsets.push_back(idx);
idx += len;
}
#ifndef CPUONLY
for (int i = 0; i < m_localBufferNum; i++)
m_gpuAsyncBuffer[i].reserve(m_tableCount);
// create pinned memory
for (int i = 0; i < m_localBufferNum; ++i)
CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
#else
for (int i = 0; i < m_localBufferNum; i++)
m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize];
#endif
}
float DecayCoefficient()
{
float f = 1.f;
switch (m_adjustLearningRateAtBeginningType)
{
case AdjustLearningRateAtBeginning::None:
break;
case AdjustLearningRateAtBeginning::Linearly:
f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter)));
break;
case AdjustLearningRateAtBeginning::Staircase:
f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1))));
break;
default:
break;
}
return f;
}
float ModelAggregationCoefficient(size_t samplesSinceLastSync)
{
float factor = 0;
int nTotalSamples = samplesSinceLastSync;
// TODO[qiwye] will conflict with multiverso
// m_pMPI->AllReduce(&nTotalSamples, 1);
if (nTotalSamples <= 0)
{
factor = 1.0f / m_pMPI->NumNodesInUse();
// give an estimated one
}
else
{
factor = (samplesSinceLastSync + 0.0f) / nTotalSamples;
}
factor = 1.0f / m_pMPI->NumNodesInUse();
return factor;
}
inline void transpose(ElemType *src, ElemType *dst, const int N, const int M)
{
for (auto n = 0; n < N*M; n++) {
auto i = n / N;
auto j = n%N;
dst[n] = src[M*j + i];
}
}
void ReportPerfStats(size_t totalSamplesProcessedSinceLastReport,
size_t localSamplesProcessedSinceLastReport)
{
m_reportTimer.Stop();
double secondsSinceLastReport = m_reportTimer.ElapsedSeconds();
m_reportTimer.Restart();
float totalThroughput = secondsSinceLastReport > 0 ? (float)totalSamplesProcessedSinceLastReport / ((float)secondsSinceLastReport * 1000.0f) : 0.0f;
float throughputPerWorker = totalThroughput / m_totalClientNumber;
string prefix = "\t\t(sim-model aggregation stats) %d-th sync: %8.2f seconds since last report ; %d samples processed by %d workers (%d by me);\n"
"\t\t(sim-model aggregation stats) %d-th sync: totalThroughput = %.2fk samplesPerSecond , throughputPerWorker = %.2fk samplesPerSecond\n";
fprintf(stderr, prefix.c_str(), (int)m_parameterSyncCounter, secondsSinceLastReport, (int)totalSamplesProcessedSinceLastReport, (int)m_totalClientNumber, (int)localSamplesProcessedSinceLastReport,
(int)m_parameterSyncCounter, totalThroughput, throughputPerWorker);
m_sampleSinceLastReport = 0;
}
multiverso::ArrayServer<ElemType>* m_serverArray;
multiverso::ArrayWorker<ElemType>* m_workerArray;
thread * m_aysncBufferThread;
bool m_isInitialized;
bool m_doesEveryNodesShouldSynced;
bool m_ModelAveragingSGDSimulating;
int m_totalClientNumber;
int m_traceLevel;
int m_syncPerfStats;
Timer m_reportTimer;
size_t m_parameterSyncCounter;
size_t m_sampleSinceLastReport;
bool m_useAsyncBuffered;
int m_localBufferNum;
int * m_bufferSwapIndex;
int m_bufferIndexInUse;
std::vector<multiverso::GetOption*> m_getOptions; // used by sparse table
std::vector<multiverso::AddOption*> m_addOptions; // used by sparse table
AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType;
double m_adjustCoefficient;
size_t m_adjustMBNumber;
vector<size_t> m_tableLength;
size_t m_totalModelSize;
vector<size_t> m_tableOffsets;
//shared_ptr<ElemType> m_deltaArray;
ElemType * m_deltaArray;
//std::vector<shared_ptr<ElemType> > m_cpuAsyncBuffer;
ElemType ** m_cpuAsyncBuffer;
MPIWrapperPtr m_pMPI;
// GPU double buffer
std::vector<std::vector<Matrix<ElemType> >> m_gpuAsyncBuffer;
int m_tableCount;
#ifndef CPUONLY
cudaStream_t _commStream;
#endif
}; // Class MultiversoHelper
#endif
// A None implementation of ASGDHelper interface which does nothing
// This is used when CNTK_ENABLE_ASGD = false
template<class ElemType = float>
class NoneASGDHelper : public ASGDHelper<ElemType>
{
public:
NoneASGDHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int nodeNumRanks,
bool useAsyncBuffered = true,
bool isSimModelAveragingSGD = false,
AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None,
double adjustcoef = 0.2,
size_t adjustnbmb = 600,
int traceLevel = 0,
int syncPerfStats = 0,
const MPIWrapperPtr& pMPI = nullptr) { }
~NoneASGDHelper() { }
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNode) override { }
void PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced) override { }
void WaitAll() override { }
void WaitAsyncBuffer() override { }
};
template<class ElemType>
ASGDHelper<ElemType>* NewASGDHelper(
const std::list<ComputationNodeBasePtr> & learnableNodes, // Parameters that needs to be train
size_t nodeNumRanks, // Number of working nodes
bool useAsyncBuffered, // Using asynchonous buffer to hide communication cost
bool isSimulatedModelAveragingSGD,
AdjustLearningRateAtBeginning adjusttype,
double adjustCoef,
size_t adjustPerMinibatches,
int traceLevel,
int syncPerfStats,
const MPIWrapperPtr& pMPI)
{
#ifdef ASGD_PARALLEL_SUPPORT
return MultiversoHelper<ElemType>(learnableNodes, nodeNumRanks, useAsyncBuffered, isSimulatedModelAveragingSGD,
adjusttype, adjustCoef, adjustPerMinibatches, traceLevel, syncPerfStats, pMPI);
#elif
return NoASGDHelper<ElemType>(learnableNodes, nodeNumRanks, useAsyncBuffered, isSimulatedModelAveragingSGD,
adjusttype, adjustCoef, adjustPerMinibatches, traceLevel, syncPerfStats, pMPI);
#endif
}
} // namespace CNTK
} // namespace MSR
} // namespace Microsoft

Просмотреть файл

@ -18,11 +18,7 @@
#include "V2AllReduceDistGradAggregator.h"
#endif
#ifdef ASGD_PARALLEL_SUPPORT
#include "MultiversoWrapper.h"
#else
#include "NoMultiversoWrapper.h"
#endif
#include "ASGDHelper.h"
#include "SimpleDistGradAggregator.h"
#include "V2SimpleDistGradAggregator.h"
@ -405,8 +401,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
// Multiverso Warpper for ASGD logic init
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
{
m_pMultiversoHelper =
std::make_shared<MultiversoHelper<ElemType>>(learnableNodes,
m_pASGDHelper.reset(NewASGDHelper<ElemType>(learnableNodes,
m_mpi->NumNodesInUse(),
m_isPipeline,
m_isSimulateMA,
@ -415,10 +410,10 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
m_adjustPerMinibatches,
m_traceLevel,
m_syncStatsTrace,
m_mpi);
m_pMultiversoHelper->InitModel(learnableNodes);
m_pMultiversoHelperBarrier = false;
m_pMultiversoHelper->WaitAll();
m_mpi));
m_pASGDHelper->InitModel(learnableNodes);
m_pASGDHelperBarrier = false;
m_pASGDHelper->WaitAll();
}
// --- MAIN EPOCH LOOP
@ -825,7 +820,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
delete inputMatrices;
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
m_pMultiversoHelper.reset();
m_pASGDHelper.reset();
}
// -----------------------------------------------------------------------
@ -1282,7 +1277,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
if (nSamplesSinceLastModelSync >= m_nFramesBetweenASGDSync[epochNumber])
{
m_pMultiversoHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync);
m_pASGDHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync);
nSamplesSinceLastModelSync = 0;
}
}
@ -1411,7 +1406,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
if (useAsyncGradientAggregation && (m_mpi->NumNodesInUse() > 1))
{
m_pMultiversoHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync);
m_pASGDHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync);
nSamplesSinceLastModelSync = 0;
}

Просмотреть файл

@ -19,7 +19,7 @@
#include <random>
#include "Profiler.h"
#include "MASGD.h"
#include "ASGDCommon.h"
#include "ASGDHelper.h"
using namespace std; // ugh! TODO: get rid of this from .h files!!!
#define CNTK_CHECKPOINT_VERSION_1 1 // 1 -> no version number
@ -316,10 +316,6 @@ protected:
template <class ElemType>
class IDistGradAggregator;
// MultiversoHelper is used for parallel training using DataParallelASGD
template <class ElemType>
class MultiversoHelper;
// -----------------------------------------------------------------------
// class SGD
// -----------------------------------------------------------------------
@ -579,8 +575,8 @@ protected:
private:
void MarkDropoutNodesEvalTimeStampAsOutdated(const ComputationNetworkPtr& net, const ComputationNodeBasePtr& criterionNode);
shared_ptr<MultiversoHelper<ElemType>> m_pMultiversoHelper;
bool m_pMultiversoHelperBarrier;
std::shared_ptr<ASGDHelper<ElemType>> m_pASGDHelper;
bool m_pASGDHelperBarrier;
bool UsingGradientAggregation(size_t epochNumber) const
{

Просмотреть файл

@ -99,13 +99,11 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClInclude Include="..\Common\CrossProcessMutex.h" />
<ClInclude Include="..\Common\Include\ASGDCommon.h" />
<ClInclude Include="..\Common\Include\Basics.h" />
<ClInclude Include="..\Common\Include\BestGpu.h" />
<ClInclude Include="..\Common\Include\Config.h" />
<ClInclude Include="..\Common\Include\DataReader.h" />
<ClInclude Include="..\Common\Include\MultiversoWrapper.h" />
<ClInclude Include="..\Common\Include\NoMultiversoWrapper.h" />
<ClInclude Include="..\Common\Include\ASGDHelper.h" />
<ClInclude Include="..\Common\Include\TensorShape.h" />
<ClInclude Include="..\Common\Include\DataWriter.h" />
<ClInclude Include="..\Common\Include\File.h" />
@ -143,6 +141,7 @@
<ClInclude Include="V2SimpleDistGradAggregator.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="ASGDHelper.cpp" />
<ClCompile Include="PostComputingActions.cpp" />
<ClCompile Include="Profiler.cpp" />
<ClCompile Include="SGD.cpp" />

Просмотреть файл

@ -13,6 +13,9 @@
<ClCompile Include="PostComputingActions.cpp">
<Filter>Stat</Filter>
</ClCompile>
<ClCompile Include="ASGDHelper.cpp">
<Filter>Parallelization</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\Common\Include\fileutil.h">
@ -123,12 +126,6 @@
<ClInclude Include="..\ComputationNetworkLib\TrainingNodes.h">
<Filter>from ComputationNetworkLib\Nodes</Filter>
</ClInclude>
<ClInclude Include="..\Common\Include\MultiversoWrapper.h">
<Filter>Parallelization</Filter>
</ClInclude>
<ClInclude Include="..\Common\Include\NoMultiversoWrapper.h">
<Filter>Parallelization</Filter>
</ClInclude>
<ClInclude Include="MASGD.h">
<Filter>Parallelization</Filter>
</ClInclude>
@ -141,7 +138,7 @@
<ClInclude Include="V2SimpleDistGradAggregator.h">
<Filter>Parallelization</Filter>
</ClInclude>
<ClInclude Include="..\Common\Include\ASGDCommon.h">
<ClInclude Include="..\Common\Include\ASGDHelper.h">
<Filter>Parallelization</Filter>
</ClInclude>
</ItemGroup>