This commit is contained in:
Qiwei Ye 2016-10-31 01:05:58 +08:00
Родитель 4be2822a65
Коммит 1b92176e43
5 изменённых файлов: 162 добавлений и 149 удалений

Просмотреть файл

@ -7,11 +7,20 @@
namespace Microsoft { namespace MSR { namespace CNTK {
enum class AdjustLearningRateatBeginning : int
// -----------------------------------------------------------------------
// class AdjustLearningRateAtBeginning
// Providing option for DataParallelASGD training. so that every nodes
// could adjust learning rate every minibatch at first N epochs.
// -----------------------------------------------------------------------
// TODO: We can removed these options once we can adjust learning rate at minibatchs level
enum class AdjustLearningRateAtBeginning : int
{
None = 0,
Linearly = 1,
Staircase = (1 << 1),
None = 0, // default, don't adjust learning rate
Linearly = 1, // using linear adjustment, learning rate will from 0 to learningRatesPerMB
Staircase = (1 << 1), // using staircased adjustment, learning rate will from 0 to learningRatesPerMB every adjustNbMinibatch
};
}}}

Просмотреть файл

@ -37,18 +37,17 @@
#include <algorithm>
namespace Microsoft { namespace MSR { namespace CNTK {
#define MULTIVERSO_DEBUG
// #define MULTIVERSO_DEBUG
#ifndef CPUONLY
#define CudaErrorCheck(ans) { gpuAssert((ans), __FILE__, __LINE__); }
inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true)
{
if (code != cudaSuccess)
inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort = true)
{
fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line);
if (abort) exit(code);
if (code != cudaSuccess)
{
fprintf(stderr, "GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line);
if (abort) exit(code);
}
}
}
#endif
template<class ElemType = float>
@ -59,22 +58,22 @@ public:
MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int nodeNumRanks,
bool useAsyncBuffered = true,
bool isSimModelAveragingSGD = false,
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
double adjustcoef = 0.2,
size_t adjustnbmb = 600,
bool isSimulatedModelAveragingSGD = false,
AdjustLearningRateAtBeginning adjusttype = AdjustLearningRateAtBeginning::None,
double adjustCoef = 0.2,
size_t adjustPerMinibatches = 600,
int traceLevel = 0,
int syncPerfStats = 0,
const MPIWrapperPtr& pMPI = nullptr)
: m_parameterSyncCounter(0), m_adjustLearningRateAtBeginningType(adjusttype),
m_adjustCoefficient(adjustcoef), m_adjustMBNumber(adjustnbmb),
m_adjustCoefficient(adjustCoef), m_adjustMBNumber(adjustPerMinibatches),
m_totalClientNumber(nodeNumRanks), m_useAsyncBuffered(useAsyncBuffered),
m_traceLevel(traceLevel), m_useSimModelAveragingSGD(isSimModelAveragingSGD), m_isSycned(false),
m_traceLevel(traceLevel), m_ModelAveragingSGDSimulating(isSimulatedModelAveragingSGD), m_doesEveryNodesShouldSynced(false),
m_pMPI(pMPI), m_syncPerfStats(syncPerfStats)
{
if (m_useSimModelAveragingSGD)
if (m_ModelAveragingSGDSimulating)
{
m_isSycned = true;
m_doesEveryNodesShouldSynced = true;
m_useAsyncBuffered = false;
}
// Pipeline releated variables
@ -105,7 +104,7 @@ MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
else if (m_traceLevel > 4)
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Error);
if (m_isSycned)
if (m_doesEveryNodesShouldSynced)
multiverso::SetCMDFlag("sync", true);
MultiversoInit(learnableNodes);
@ -136,7 +135,8 @@ MultiversoHelper(const std::list<ComputationNodeBasePtr> & learnableNodes,
multiverso::MV_ShutDown(false);
}
// upoload preCompute model to the parameter servers
// upoload initilized model(, which was pre-computed by CNTK logic) to the parameter servers, so that
// every node could start training at a same model.
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
{
float factor = 1.0f / m_totalClientNumber;
@ -163,6 +163,8 @@ void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
// because the parameter server will minus the delta on the server, so that we should send the minus initial model to the server.
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), -factor));
// Using matrix_table on multiverso could make CNTK sync with parameter server layer by layer possable,
// but it will also slower about 10% than array_table.
#ifdef MULTIVERSO_USE_MATRIXTABLE
for (int widx = 0; widx < m_tableCount; widx++)
{
@ -197,15 +199,15 @@ void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
m_reportTimer.Start();
}
// pushing parameters of learnableNodes to parameter servers, then get the latests model back.
// Push parameters of learnableNodes to parameter servers, then get the latests model back.
bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes, size_t sampleSinceLastSynced = 0)
{
m_parameterSyncCounter++;
double CPUToGPUTime;
double GPUToCPUTime;
double fromCPUToGPUTime;
double fromGPUToCPUTime;
double networkTime;
double GPUSwapTime;
double swapTimeOnGPU;
m_reportTimer.Restart();
WaitAsyncBuffer();
m_reportTimer.Stop();
@ -213,10 +215,10 @@ bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes,
// reset statics for profiling
if (m_traceLevel > 2 && m_syncPerfStats > 0 && m_parameterSyncCounter % m_syncPerfStats == 0)
{
CPUToGPUTime = 0;
GPUToCPUTime = 0;
fromCPUToGPUTime = 0;
fromGPUToCPUTime = 0;
networkTime = 0;
GPUSwapTime = 0;
swapTimeOnGPU = 0;
}
m_bufferIndexInUse = m_bufferSwapIndex[m_bufferIndexInUse];
@ -252,10 +254,11 @@ bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes,
m_reportTimer.Stop();
if (m_traceLevel > 2)
{
GPUSwapTime = m_reportTimer.ElapsedSeconds();
swapTimeOnGPU = m_reportTimer.ElapsedSeconds();
}
#ifndef CPUONLY
m_aysncBufferThread = new thread([&](){
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int deviceId = m_gpuAsyncBuffer[m_bufferIndexInUse][0].GetDeviceId();
@ -323,35 +326,36 @@ bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes,
m_workerArray->Get(py, m_totalModelSize);
#endif
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, Worker <--> Multiverso time %lf \n", time);
}
threadTimer.Restart();
// copy parameters from CPU buffer to GPU buffer
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
threadTimer.Restart();
// copy parameters from CPU buffer to GPU buffer
for (int widx = 0; widx < m_tableCount; widx++)
{
ElemType * py = m_cpuAsyncBuffer[m_bufferIndexInUse] + m_tableOffsets[widx];
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
py,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyHostToDevice,
_commStream));
}
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
CudaErrorCheck(cudaMemcpyAsync(m_gpuAsyncBuffer[m_bufferIndexInUse][widx].Data(),
py,
m_gpuAsyncBuffer[m_bufferIndexInUse][widx].GetNumElements() * sizeof(ElemType),
cudaMemcpyHostToDevice,
_commStream));
}
CudaErrorCheck(cudaStreamSynchronize(_commStream));
threadTimer.Stop();
if (m_traceLevel > 3)
{
double time = threadTimer.ElapsedSeconds();
fprintf(stderr, "\t\t -- pullAndRequest, CPU -> GPU time %lf \n", time);
}
});
#else
m_aysncBufferThread = new thread([&](){
m_aysncBufferThread = new thread([&]()
{
float factor = DecayCoefficient();
int t_cacheIdx = m_bufferIndexInUse;
@ -399,7 +403,7 @@ bool PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes,
std::transform(m_cpuAsyncBuffer[0], m_cpuAsyncBuffer[0] + m_totalModelSize, m_deltaArray, m_deltaArray, std::minus<ElemType>());
// lr decay
if (m_useSimModelAveragingSGD)
if (m_ModelAveragingSGDSimulating)
{
factor = ModelAggregationCoefficient(sampleSinceLastSynced);
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
@ -591,12 +595,12 @@ private:
float f = 1.f;
switch (m_adjustLearningRateAtBeginningType)
{
case AdjustLearningRateatBeginning::None:
case AdjustLearningRateAtBeginning::None:
break;
case AdjustLearningRateatBeginning::Linearly:
case AdjustLearningRateAtBeginning::Linearly:
f = min(f, max(0.f, (float)(m_adjustCoefficient + (1 - m_adjustCoefficient) / m_adjustMBNumber * m_parameterSyncCounter)));
break;
case AdjustLearningRateatBeginning::Staircase:
case AdjustLearningRateAtBeginning::Staircase:
f = min(f, max(0.f, (float)(m_adjustCoefficient * (m_parameterSyncCounter / m_adjustMBNumber + 1))));
break;
default:
@ -664,8 +668,8 @@ private:
thread * m_aysncBufferThread;
bool m_isInitialized;
bool m_isSycned;
bool m_useSimModelAveragingSGD;
bool m_doesEveryNodesShouldSynced;
bool m_ModelAveragingSGDSimulating;
int m_totalClientNumber;
int m_traceLevel;
@ -682,7 +686,7 @@ private:
std::vector< multiverso::AddOption*> m_addOptions; // used by sparse table
AdjustLearningRateatBeginning m_adjustLearningRateAtBeginningType;
AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginningType;
double m_adjustCoefficient;
size_t m_adjustMBNumber;
@ -702,6 +706,4 @@ private:
cudaStream_t _commStream;
#endif
};
} // namespace CNTK
} // namespace MSR
} // namespace Microsoft
}}}

Просмотреть файл

@ -18,7 +18,7 @@
#include "V2AllReduceDistGradAggregator.h"
#endif
#ifdef MULTIVERSO_SUPPORT
#ifdef ASGD_PARALLEL_SUPPORT
#include "MultiversoWrapper.h"
#else
#include "NoMultiversoWrapper.h"
@ -402,17 +402,17 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
m_seqGammarCalcAMF, m_seqGammarCalcLMF, m_seqGammarCalcWP, m_seqGammarCalcbMMIFactor, m_seqGammarCalcUsesMBR);
}
//Multiverso Warpper for ASGD logic init
// Multiverso Warpper for ASGD logic init
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
{
m_pMultiversoHelper =
new MultiversoHelper<ElemType>(learnableNodes,
std::make_shared<MultiversoHelper<ElemType>>(learnableNodes,
m_mpi->NumNodesInUse(),
m_isPipeline,
m_isSimulateMA,
m_adjustlearningrateatbeginning,
m_adjustcoefficient,
m_adjustnbminibatch,
m_adjustLearningRateAtBeginning,
m_adjustCoefficient,
m_adjustPerMinibatches,
m_traceLevel,
m_syncStatsTrace,
m_mpi);
@ -593,7 +593,7 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
if (validationSetDataReader != trainSetDataReader && validationSetDataReader != nullptr)
{
// TODO(dataASGD) making evaluator becoming nondistributed one when using asynchonized data parallel.
// TODO(dataASGD) making evaluator becoming nondistributed one when using ASGD.
SimpleEvaluator<ElemType> evalforvalidation(net, UsingAsyncGradientAggregation(i + 1) ?nullptr : m_mpi, m_enableDistributedMBReading);
vector<wstring> cvSetTrainAndEvalNodes;
if (criterionNodes.size() > 0)
@ -606,7 +606,6 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
}
// BUGBUG: We should not use the training MB size. The training MB size is constrained by both convergence and memory. Eval is only constrained by memory.
// Todo(dataASGD) adding an options for that the cross validating should set to nondistributed reader while using asynchonized data parallel
let vScore = evalforvalidation.Evaluate(validationSetDataReader, cvSetTrainAndEvalNodes, m_mbSize[i]);
LOGPRINTF(stderr, "Finished Epoch[%2d of %d]: [Validate] ", i + 1, (int)m_maxEpochs);
for (size_t k = 0; k < vScore.size() /*&& k < 2*/; k++)
@ -826,7 +825,8 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
delete inputMatrices;
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
delete m_pMultiversoHelper;
//delete m_pMultiversoHelper;
m_pMultiversoHelper.reset();
}
// -----------------------------------------------------------------------
@ -997,7 +997,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
double readTime = 0;
double computeTime = 0;
double parameterUpdateTime = 0;
double parameterSyncTime = 0;
double parameterSyncTime = 0; // perf communication time between syncs.
if (m_perfTraceLevel > 0)
fineGrainedPerfMeasurementTimer.Start();
@ -1285,7 +1285,7 @@ size_t SGD<ElemType>::TrainOneEpoch(ComputationNetworkPtr net,
{
m_pMultiversoHelper->PushAndPullModel(learnableNodes, nSamplesSinceLastModelSync);
nSamplesSinceLastModelSync = 0;
}
}
}
@ -2563,11 +2563,11 @@ static LearningRateSearchAlgorithm ParseLearningRateSearchType(const wstring& s)
else InvalidArgument("autoAdjustLR: Invalid learning rate search type. Valid values are (none | searchBeforeEpoch | adjustAfterEpoch)");
}
static AdjustLearningRateatBeginning AdjustLearningRateAtBeginningType(wstring s)
static AdjustLearningRateAtBeginning AdjustLearningRateAtBeginningType(wstring s)
{
if (EqualCI(s.c_str(), L"") || EqualCI(s.c_str(), L"none")) return AdjustLearningRateatBeginning::None;
else if (EqualCI(s.c_str(), L"linearly")) return AdjustLearningRateatBeginning::Linearly;
else if (EqualCI(s.c_str(), L"staircase")) return AdjustLearningRateatBeginning::Staircase;
if (EqualCI(s.c_str(), L"") || EqualCI(s.c_str(), L"none")) return AdjustLearningRateAtBeginning::None;
else if (EqualCI(s.c_str(), L"linearly")) return AdjustLearningRateAtBeginning::Linearly;
else if (EqualCI(s.c_str(), L"staircase")) return AdjustLearningRateAtBeginning::Staircase;
else InvalidArgument("AdjustLearningRateatBeginningType: Invalid Type. Valid values are (None | Linearly | Staircase)");
}
@ -2832,15 +2832,15 @@ SGDParams::SGDParams(const ConfigRecordType& configSGD, size_t sizeofElemType)
else
{
size_t numMPIWorkers = pMPI->NumNodesInUse();
const ConfigRecordType& configParallelTrain(configSGD(L"ParallelTrain", ConfigRecordType::Record()));
m_parallelizationMethod = ParseParallelizationMethod(configParallelTrain(L"parallelizationMethod", L"none"));
m_parallelizationStartEpochNum = configParallelTrain(L"parallelizationStartEpoch", (int) 1) - 1; // Internally, epoch numbers are 0-based
if (m_parallelizationStartEpochNum < 0 /* sic */)
const ConfigRecordType& configParallelTrain(configSGD(L"ParallelTrain", ConfigRecordType::Record()));
m_parallelizationMethod = ParseParallelizationMethod(configParallelTrain(L"parallelizationMethod", L"none"));
m_parallelizationStartEpochNum = configParallelTrain(L"parallelizationStartEpoch", (int)1) - 1; // Internally, epoch numbers are 0-based
if (m_parallelizationStartEpochNum < 0 /* sic */)
// Be explicit that user-facing epoch numbers are 1-based
InvalidArgument("parallelizationStartEpoch must be greater or equal to 1");
m_enableDistributedMBReadingNotSpecified = !configParallelTrain.Exists(L"distributedMBReading");
m_enableDistributedMBReading = configParallelTrain(L"distributedMBReading", false);
m_syncStatsTrace = configParallelTrain(L"syncPerfStats", (int) 0);
m_enableDistributedMBReadingNotSpecified = !configParallelTrain.Exists(L"distributedMBReading");
m_enableDistributedMBReading = configParallelTrain(L"distributedMBReading", false);
m_syncStatsTrace = configParallelTrain(L"syncPerfStats", (int)0);
if (configParallelTrain.Exists(L"DataParallelSGD"))
{
@ -2858,62 +2858,62 @@ SGDParams::SGDParams(const ConfigRecordType& configSGD, size_t sizeofElemType)
if (configParallelTrain.Exists(L"ModelAveragingSGD"))
{
const ConfigRecordType& configMASGD(configParallelTrain(L"ModelAveragingSGD", ConfigRecordType::Record()));
if (configMASGD.Exists(L"blockSizePerWorker") && configMASGD.Exists(L"blockSize"))
InvalidArgument("It is only allowed to set blockSizePerWorker or blockSize, not both of them");
else if (configMASGD.Exists(L"blockSize"))
m_modelAggregationBlockSize = configMASGD(L"blockSize");
else if (configMASGD.Exists(L"blockSizePerWorker"))
{
m_modelAggregationBlockSize = configMASGD(L"blockSizePerWorker");
m_modelAggregationBlockSize *= numMPIWorkers;
}
else
m_modelAggregationBlockSize = 40000 * numMPIWorkers; // default value
if (configMASGD.Exists(L"blockSizePerWorker") && configMASGD.Exists(L"blockSize"))
InvalidArgument("It is only allowed to set blockSizePerWorker or blockSize, not both of them");
else if (configMASGD.Exists(L"blockSize"))
m_modelAggregationBlockSize = configMASGD(L"blockSize");
else if (configMASGD.Exists(L"blockSizePerWorker"))
{
m_modelAggregationBlockSize = configMASGD(L"blockSizePerWorker");
m_modelAggregationBlockSize *= numMPIWorkers;
}
else
m_modelAggregationBlockSize = 40000 * numMPIWorkers; // default value
#if 1 // legacy option
if (configMASGD.Exists(L"syncFrequencyInFrames"))
{
if (configMASGD.Exists(L"blockSizePerWorker") || configMASGD.Exists(L"blockSize"))
InvalidArgument("syncFrequencyInFrames is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configMASGD(L"syncFrequencyInFrames");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncFrequencyInFrames in ModelAveragingSGD is going to be deprecated. Please use blockSizePerWorker instead\n");
}
if (configMASGD.Exists(L"syncPeroid"))
{
if (configMASGD.Exists(L"blockSizePerWorker") || configMASGD.Exists(L"blockSize"))
InvalidArgument("syncPeriod is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configMASGD(L"syncPeriod");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncPeroid in ModelAveragingSGD is going to be deprecated. Please use blockSizePerWorker instead in the future.\n");
if (configMASGD.Exists(L"blockSizePerWorker") || configMASGD.Exists(L"blockSize"))
InvalidArgument("syncFrequencyInFrames is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configMASGD(L"syncFrequencyInFrames");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncFrequencyInFrames in ModelAveragingSGD is going to be deprecated. Please use blockSizePerWorker instead\n");
}
if (configMASGD.Exists(L"syncPeroid"))
{
if (configMASGD.Exists(L"blockSizePerWorker") || configMASGD.Exists(L"blockSize"))
InvalidArgument("syncPeriod is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configMASGD(L"syncPeriod");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncPeroid in ModelAveragingSGD is going to be deprecated. Please use blockSizePerWorker instead in the future.\n");
}
#endif
}
if (configParallelTrain.Exists(L"BlockMomentumSGD"))
{
#ifndef CNTK_PARALLEL_TRAINING_SUPPORT
InvalidArgument("BlockMomentumSGD is not enabled in this version.\n");
InvalidArgument("BlockMomentumSGD is not enabled in this version.\n");
#else
const ConfigRecordType& configBMSGD(configParallelTrain(L"BlockMomentumSGD", ConfigRecordType::Record()));
if (configBMSGD.Exists(L"blockSize") && configBMSGD.Exists(L"blockSizePerWorker"))
InvalidArgument("It is only allowed to set blockSizePerWorker or blockSize, not both of them");
else if (configBMSGD.Exists(L"blockSizePerWorker"))
{
m_modelAggregationBlockSize = configBMSGD(L"blockSizePerWorker");
m_modelAggregationBlockSize *= numMPIWorkers;
}
else if (configBMSGD.Exists(L"blockSize"))
m_modelAggregationBlockSize = configBMSGD(L"blockSize");
else
m_modelAggregationBlockSize = 120000 * numMPIWorkers; // default value
if (configBMSGD.Exists(L"blockSize") && configBMSGD.Exists(L"blockSizePerWorker"))
InvalidArgument("It is only allowed to set blockSizePerWorker or blockSize, not both of them");
else if (configBMSGD.Exists(L"blockSizePerWorker"))
{
m_modelAggregationBlockSize = configBMSGD(L"blockSizePerWorker");
m_modelAggregationBlockSize *= numMPIWorkers;
}
else if (configBMSGD.Exists(L"blockSize"))
m_modelAggregationBlockSize = configBMSGD(L"blockSize");
else
m_modelAggregationBlockSize = 120000 * numMPIWorkers; // default value
#if 1 // legacy option
if (configBMSGD.Exists(L"syncPeriod"))
{
if (configBMSGD.Exists(L"blockSizePerWorker") || configBMSGD.Exists(L"blockSize"))
InvalidArgument("syncPeriod is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configBMSGD(L"syncPeriod");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncPeroid in BlockMomentumSGD is going to be deprecated. Please use blockSizePerWorker instead in the future.\n");
}
if (configBMSGD.Exists(L"syncPeriod"))
{
if (configBMSGD.Exists(L"blockSizePerWorker") || configBMSGD.Exists(L"blockSize"))
InvalidArgument("syncPeriod is a deprecated alias of blockSizePerWorker. It is not allowed to specify both of them");
m_modelAggregationBlockSize = configBMSGD(L"syncPeriod");
m_modelAggregationBlockSize *= numMPIWorkers;
fprintf(stderr, "WARNING: option syncPeroid in BlockMomentumSGD is going to be deprecated. Please use blockSizePerWorker instead in the future.\n");
}
#endif
m_resetSGDMomentum = configBMSGD(L"resetSGDMomentum", true);
m_useNesterovBlockMomentum = configBMSGD(L"useNesterovMomentum", true);
@ -2931,29 +2931,30 @@ SGDParams::SGDParams(const ConfigRecordType& configSGD, size_t sizeofElemType)
else if (configBMSGD.Exists(L"blockMomentumPerSync"))
{
double blockMomentum = configBMSGD(L"blockMomentumPerSync");
m_blockMomentumAsTimeConstant = BlockMomentumSGD<double>::Momentum2TimeConstant(blockMomentum, m_modelAggregationBlockSize);
m_blockMomentumAsTimeConstant = BlockMomentumSGD<double>::Momentum2TimeConstant(blockMomentum, m_modelAggregationBlockSize);
}
#endif
else /*if (!configBMSGD.Exists(L"blockMomentumPerSync") && !configBMSGD.Exists(L"blockMomentumAsTimeConstant"))*/
{
double blockMomentum = 1.0 - 1.0 / (double)numMPIWorkers; // this is a default value which ensures each block update contributes equally
m_blockMomentumAsTimeConstant = BlockMomentumSGD<double>::Momentum2TimeConstant(blockMomentum, m_modelAggregationBlockSize);
double blockMomentum = 1.0 - 1.0 / (double)numMPIWorkers; // this is a default value which ensures each block update contributes equally
m_blockMomentumAsTimeConstant = BlockMomentumSGD<double>::Momentum2TimeConstant(blockMomentum, m_modelAggregationBlockSize);
}
#endif
}
if (configParallelTrain.Exists(L"DataParallelASGD"))
{
const ConfigRecordType & configDataParallelASGD(configParallelTrain(L"DataParallelASGD", ConfigRecordType::Record()));
m_nFramesBetweenASGDSync = configDataParallelASGD(L"syncPeriod", ConfigRecordType::Array(intargvector(vector<int>{256})));
m_isPipeline = configDataParallelASGD(L"UsePipeline", false);
m_isSimulateMA = configDataParallelASGD(L"SimModelAverage", false);
if (configDataParallelASGD.Exists(L"AdjustLearningRateAtBeginning"))
m_isSimulateMA = configDataParallelASGD(L"SimModelAverage", false); // using parameter server-based version of ModefAveragingSGD
if (configDataParallelASGD.Exists(L"AdjustLearningRateAtBeginning")) // adjust learning rate per m_adjustNumInBatch minibatchs until to original one
// this option could be used to takcle the unstableness of ASGD
{
const ConfigRecordType & configAdjustLearningRateAtBeginning(configDataParallelASGD(L"AdjustLearningRateAtBeginning", ConfigRecordType::Record()));
m_adjustlearningrateatbeginning = AdjustLearningRateAtBeginningType(configAdjustLearningRateAtBeginning(L"adjustType", L"None"));
m_adjustcoefficient = configAdjustLearningRateAtBeginning(L"adjustCoefficient", (double)0.1);
m_adjustnbminibatch = configAdjustLearningRateAtBeginning(L"adjustNbMinibatch", (size_t)256);
m_adjustLearningRateAtBeginning = AdjustLearningRateAtBeginningType(configAdjustLearningRateAtBeginning(L"adjustType", L"None"));
m_adjustCoefficient = configAdjustLearningRateAtBeginning(L"adjustCoefficient", (double)0.1);
m_adjustPerMinibatches = configAdjustLearningRateAtBeginning(L"adjustPerMinibatches", (size_t)256);
}
}
} // if (!pMPI)

Просмотреть файл

@ -5,7 +5,6 @@
#pragma once
#include "Basics.h"
#include "ASGDCommon.h"
#include "ComputationNetwork.h"
#include "SimpleEvaluator.h"
#include "DataReader.h"
@ -20,6 +19,7 @@
#include <random>
#include "Profiler.h"
#include "MASGD.h"
#include "ASGDCommon.h"
using namespace std; // ugh! TODO: get rid of this from .h files!!!
#define CNTK_CHECKPOINT_VERSION_1 1 // 1 -> no version number
@ -288,16 +288,16 @@ protected:
double m_L1RegWeight;
// Parallel training related with ASGD
intargvector m_numMBsToASGDPushAndPull; // decide how many minibatchs should ASGD to a pull&push to parameter server.
intargvector m_numMiniBatchesToPushAndPullforASGD; // decide how many minibatchs should ASGD to a pull&push to parameter server.
// note that, this will override m_nFramesBetweenASGDSync when set.
intargvector m_nFramesBetweenASGDSync;
bool m_isPipeline;
bool m_isSimulateMA;
AdjustLearningRateatBeginning m_adjustlearningrateatbeginning;
double m_adjustcoefficient;
size_t m_adjustnbminibatch;
AdjustLearningRateAtBeginning m_adjustLearningRateAtBeginning;
double m_adjustCoefficient;
size_t m_adjustPerMinibatches;
//sequence training
// sequence training
double m_hSmoothingWeight;
double m_frameDropThresh;
bool m_doReferenceAlign;
@ -316,6 +316,7 @@ protected:
template <class ElemType>
class IDistGradAggregator;
// MultiversoHelper is used for parallel training using DataParallelASGD
template <class ElemType>
class MultiversoHelper;
@ -578,7 +579,7 @@ protected:
private:
void MarkDropoutNodesEvalTimeStampAsOutdated(const ComputationNetworkPtr& net, const ComputationNodeBasePtr& criterionNode);
MultiversoHelper<ElemType>* m_pMultiversoHelper;
shared_ptr<MultiversoHelper<ElemType>> m_pMultiversoHelper;
bool m_pMultiversoHelperBarrier;
bool UsingGradientAggregation(size_t epochNumber) const

Просмотреть файл

@ -48,7 +48,7 @@
</PrecompiledHeader>
<PreprocessorDefinitions>WIN32;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions Condition="'$(CNTK_ENABLE_1BitSGD)'=='true'">QUANTIZED_GRADIENT_AGGREGATION;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions Condition="'$(CNTK_ENABLE_ASGD)'=='true'">MULTIVERSO_SUPPORT;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions Condition="'$(CNTK_ENABLE_ASGD)'=='true'">ASGD_PARALLEL_SUPPORT;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<DisableSpecificWarnings>4819</DisableSpecificWarnings>
</ClCompile>
<Link>