debug check,. WARNING: should be revert later

This commit is contained in:
Qiwei Ye 2016-02-16 11:47:37 +08:00
Родитель 83a60750d7
Коммит da13c84d9d
1 изменённых файлов: 27 добавлений и 10 удалений

Просмотреть файл

@ -1,7 +1,5 @@
#pragma once #pragma once
// This uses Multiverso.h which requires
// the header files in ..\Multiverso\include
#include <multiverso/multiverso.h> #include <multiverso/multiverso.h>
#include <multiverso/table/array_table.h> #include <multiverso/table/array_table.h>
#pragma comment(lib, "IMultiverso.lib") #pragma comment(lib, "IMultiverso.lib")
@ -50,7 +48,7 @@ namespace Microsoft {
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr; typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
public: public:
MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes, MultiversoWrapper(const std::list<ComputationNodeBasePtr> & learnableNodes,
int localWorkerNumber, int MPINodeNum,
bool isPipeline = true, bool isPipeline = true,
AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None, AdjustLearningRateatBeginning adjusttype = AdjustLearningRateatBeginning::None,
double adjustcoef = 0.2, double adjustcoef = 0.2,
@ -63,7 +61,7 @@ namespace Microsoft {
//m_multiversoAdaptor = false; //m_multiversoAdaptor = false;
m_totalClientNumber = localWorkerNumber; m_totalClientNumber = MPINodeNum;
//Pipeline releated variables //Pipeline releated variables
m_isPipelined = isPipeline; m_isPipelined = isPipeline;
@ -89,7 +87,7 @@ namespace Microsoft {
m_modelSizeOfEachServer = new size_t[m_totalClientNumber]; m_modelSizeOfEachServer = new size_t[m_totalClientNumber];
m_indexOfEachServer = new size_t[m_totalClientNumber]; m_indexOfEachServer = new size_t[m_totalClientNumber];
MultiversoInit(learnableNodes, 1); MultiversoInit(learnableNodes);
} }
~MultiversoWrapper() ~MultiversoWrapper()
@ -122,6 +120,7 @@ namespace Microsoft {
// This function will upload parameters into Multiverso // This function will upload parameters into Multiverso
void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes) void InitModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
{ {
printf("DEBUG POINT: %s, %d \n", __FILE__, __LINE__);
float factor = (float) 1.0 / m_totalClientNumber; float factor = (float) 1.0 / m_totalClientNumber;
//weights //weights
@ -138,6 +137,7 @@ namespace Microsoft {
ElemType* px = m_cpuAsyncBuffer[0] + m_tableIndex[i]; ElemType* px = m_cpuAsyncBuffer[0] + m_tableIndex[i];
mat.CopyToArray(px, m_tableLength[i]); mat.CopyToArray(px, m_tableLength[i]);
} }
printf("DEBUG POINT: %s, %d \n", __FILE__, __LINE__);
for (int i = 1; i < m_localCacheNumber; i++) for (int i = 1; i < m_localCacheNumber; i++)
memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize); memcpy(m_cpuAsyncBuffer[i], m_cpuAsyncBuffer[0], sizeof(ElemType) * m_totalModelSize);
@ -146,7 +146,9 @@ namespace Microsoft {
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor)); std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
printf("DEBUG POINT: %s, %d \n", __FILE__, __LINE__);
m_sharedArray->Add(m_deltaArray, m_totalModelSize); m_sharedArray->Add(m_deltaArray, m_totalModelSize);
printf("DEBUG POINT: %s, %d \n", __FILE__, __LINE__);
m_sharedArray->Get(); m_sharedArray->Get();
//for (int row = 0; row < m_totalClientNumber; ++row) //for (int row = 0; row < m_totalClientNumber; ++row)
// m_multiversoAdaptor->Add(table_id, row, m_deltaArray + m_indexOfEachServer[row], factor); // m_multiversoAdaptor->Add(table_id, row, m_deltaArray + m_indexOfEachServer[row], factor);
@ -154,6 +156,7 @@ namespace Microsoft {
//m_multiversoAdaptor->BatchLoad(table_id, m_deltaArray, m_indexOfEachServer, m_modelSizeOfEachServer); //m_multiversoAdaptor->BatchLoad(table_id, m_deltaArray, m_indexOfEachServer, m_modelSizeOfEachServer);
memcpy(m_deltaArray, m_sharedArray->raw().data(), sizeof(ElemType) * m_totalModelSize); memcpy(m_deltaArray, m_sharedArray->raw().data(), sizeof(ElemType) * m_totalModelSize);
printf("DEBUG POINT: %s, %d \n", __FILE__, __LINE__);
} }
//Todo: support auto adjust learning rate //Todo: support auto adjust learning rate
@ -162,6 +165,7 @@ namespace Microsoft {
//ASGD logic //ASGD logic
void PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes) void PushAndPullModel(const std::list<ComputationNodeBasePtr> & learnableNodes)
{ {
printf("pushAndPullModel\n");
//Note: maybe overflow. //Note: maybe overflow.
m_modelSyncCount++; m_modelSyncCount++;
@ -169,6 +173,7 @@ namespace Microsoft {
//if (m_isPipelined && m_prefetchThread->joinable()) //if (m_isPipelined && m_prefetchThread->joinable())
// m_prefetchThread->join(); // m_prefetchThread->join();
WaitAsyncBuffer(); WaitAsyncBuffer();
printf("waitAsyncBuffer()");
m_cacheIndex = m_cacheSwapIndex[m_cacheIndex]; m_cacheIndex = m_cacheSwapIndex[m_cacheIndex];
@ -233,8 +238,11 @@ namespace Microsoft {
// lr decay // lr decay
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor)); std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
printf("pre added.\n");
m_sharedArray->Add(m_deltaArray, m_totalModelSize); m_sharedArray->Add(m_deltaArray, m_totalModelSize);
printf("added.\n");
m_sharedArray->Get(); m_sharedArray->Get();
printf("geted.\n");
memcpy(m_cpuAsyncBuffer[t_cacheIdx], m_sharedArray->raw().data(), m_totalModelSize); memcpy(m_cpuAsyncBuffer[t_cacheIdx], m_sharedArray->raw().data(), m_totalModelSize);
//////Communication //////Communication
//for (int row = 0; row < m_totalClientNumber; row++) //for (int row = 0; row < m_totalClientNumber; row++)
@ -287,9 +295,11 @@ namespace Microsoft {
// lr decay // lr decay
std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor)); std::transform(m_deltaArray, m_deltaArray + m_totalModelSize, m_deltaArray, std::bind1st(std::multiplies<ElemType>(), factor));
printf("pre added.\n");
m_sharedArray->Add(m_deltaArray, m_totalModelSize); m_sharedArray->Add(m_deltaArray, m_totalModelSize);
printf("added.\n");
m_sharedArray->Get(); m_sharedArray->Get();
printf("gotted.\n");
memcpy(m_cpuAsyncBuffer[0], m_sharedArray->raw().data(), m_totalModelSize); memcpy(m_cpuAsyncBuffer[0], m_sharedArray->raw().data(), m_totalModelSize);
//for (int row = 0; row < m_totalClientNumber; row++) //for (int row = 0; row < m_totalClientNumber; row++)
@ -332,7 +342,7 @@ namespace Microsoft {
m_prefetchThread->join(); m_prefetchThread->join();
} }
private: private:
void MultiversoInit(const std::list<ComputationNodeBasePtr> & learnableNodes, int localWorkerNumber) void MultiversoInit(const std::list<ComputationNodeBasePtr> & learnableNodes)
{ {
assert(!m_isInitialized); assert(!m_isInitialized);
m_isInitialized = true; m_isInitialized = true;
@ -360,9 +370,15 @@ namespace Microsoft {
m_modelSizeOfEachServer[i] = i < m_totalModelSize % m_totalClientNumber ? m_totalModelSize / m_totalClientNumber + 1 : m_totalModelSize / m_totalClientNumber; m_modelSizeOfEachServer[i] = i < m_totalModelSize % m_totalClientNumber ? m_totalModelSize / m_totalClientNumber + 1 : m_totalModelSize / m_totalClientNumber;
idx += m_modelSizeOfEachServer[i]; idx += m_modelSizeOfEachServer[i];
} }
printf("create worker table \n");
m_sharedArray = new multiverso::ArrayWorker<ElemType>(m_totalModelSize); m_sharedArray = new multiverso::ArrayWorker<ElemType>(m_totalModelSize);
printf("create worker table successed %d\n", m_sharedArray->raw().size());
printf("create server table \n");
m_serverArray = new multiverso::ArrayServer<ElemType>(m_totalModelSize); m_serverArray = new multiverso::ArrayServer<ElemType>(m_totalModelSize);
printf("create server table successed %d\n", m_sharedArray->raw().size());
g_mpi->WaitAll();
multiverso::MultiversoBarrier(); multiverso::MultiversoBarrier();
//multiverso::SetTable(table_id, m_totalClientNumber, ((size_t)(m_totalModelSize / m_totalClientNumber)) + 1, sizeof(ElemType) == 4 ? "float" : "double"); //multiverso::SetTable(table_id, m_totalClientNumber, ((size_t)(m_totalModelSize / m_totalClientNumber)) + 1, sizeof(ElemType) == 4 ? "float" : "double");
idx = 0; idx = 0;
@ -375,16 +391,16 @@ namespace Microsoft {
#ifndef CPUONLY #ifndef CPUONLY
//pinned memory //pinned memory
for (int i = 0; i < m_localCacheNumber; ++i) for (int i = 0; i < m_localCacheNumber; ++i)
CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize + 1), cudaHostAllocPortable)); CudaErrorCheck(cudaMallocHost((void **)&m_cpuAsyncBuffer[i], sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize + 1), cudaHostAllocPortable)); CudaErrorCheck(cudaMallocHost((void **)&m_deltaArray, sizeof(ElemType) * (m_totalModelSize), cudaHostAllocPortable));
//GPU memory cache //GPU memory cache
for (int i = 0; i < m_localCacheNumber; i++) for (int i = 0; i < m_localCacheNumber; i++)
m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount]; m_gpuAsyncBuffer[i] = new Matrix<ElemType>*[m_tableCount];
#else #else
for (int i = 0; i < m_localCacheNumber; i++) for (int i = 0; i < m_localCacheNumber; i++)
m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize + 1]; m_cpuAsyncBuffer[i] = new ElemType[m_totalModelSize];
#endif #endif
//multiverso::Init(localWorkerNumber); //multiverso::Init(localWorkerNumber);
@ -398,6 +414,7 @@ namespace Microsoft {
////m_multiversoAdaptor = new multiverso::Adaptor(adaptor_id, 0); ////m_multiversoAdaptor = new multiverso::Adaptor(adaptor_id, 0);
//printf("%s@rank %d/%d: Initialized Adaptor.\n", //printf("%s@rank %d/%d: Initialized Adaptor.\n",
// getenv("COMPUTERNAME"), multiverso::GetMPIRank(), multiverso::GetMPISize()); // getenv("COMPUTERNAME"), multiverso::GetMPIRank(), multiverso::GetMPISize());
multiverso::Log::ResetLogLevel(multiverso::LogLevel::Debug);
fflush(stdout); fflush(stdout);
} }