This commit is contained in:
Qiwei Ye 2016-01-07 16:52:04 +08:00
Родитель 937816eff2
Коммит 641c75a751
1 изменённых файлов: 12 добавлений и 19 удалений

Просмотреть файл

@ -1,6 +1,5 @@
#pragma once
// This uses Multiverso.h which requires
// the header files in ..\..\Multiverso\include
// and the lib files in ..\..\Multiverso\x64
@ -50,7 +49,6 @@ namespace Microsoft {
{
typedef shared_ptr<ComputationNode<ElemType>> ComputationNodePtr;
public:
//temp declare in public for barrier()
//TODO: move to private
multiverso::Adaptor * _adaptor;
thread * _pThread;
@ -70,6 +68,7 @@ namespace Microsoft {
_isInitialized = false;
_nClients = localWorkerNumber;
//Pipeline releated variables
_isPipeline = isPipeline;
_nLocalCache = _isPipeline ? 2 : 1;
@ -90,7 +89,7 @@ namespace Microsoft {
for (int i = 0; i < _nLocalCache; i++)
_pCacheState[i] = (i + 1) % _nLocalCache;
_pThread = new thread();
_pThread = new thread();
_pSizeEachServer = new size_t[_nClients];
_pIdxEachServer = new size_t[_nClients];
@ -99,10 +98,13 @@ namespace Microsoft {
~MultiversoWrapper()
{
if (_isPipeline && _pThread->joinable())
fprintf(stderr, "~MultiversoWrapper\n");
fflush(stderr);
if (_isPipeline && _pThread != nullptr && _pThread->joinable())
_pThread->join();
delete _pCacheState, /*_pPCache, */_pDelta, _pSizeEachServer, _pIdxEachServer;
delete _pCacheState, _pDelta, _pSizeEachServer, _pIdxEachServer;
for (size_t i = 0; i < _nLocalCache; i++)
{
@ -112,18 +114,16 @@ namespace Microsoft {
delete _pPCache[i];
#endif
}
//CudaErrorCheck(cudaFreeHost(_pCache));
delete _pPCache;
#ifndef CPUONLY
CudaErrorCheck(cudaStreamDestroy(_commStream));
//todo: delete _pMatri
#endif
multiverso::FinishTrain();
multiverso::Close(false);
}
//As for Multiverso, parameters store in server.
// This function will upload parameters into Multiverso
void ModelInit(const std::list<ComputationNodeBasePtr> & learnableNodes)
{
float factor = (float) 1.0 / _nClients;
@ -144,7 +144,7 @@ namespace Microsoft {
ElemType* px = _pPCache[0] + _vTableIdx[i];
mat.CopyToArray(px, _vTableLength[i]);
}
for (int i = 1; i < _nLocalCache; i++)
memcpy(_pPCache[i], _pPCache[0], sizeof(ElemType) * _lTotalLength);
@ -168,7 +168,6 @@ namespace Microsoft {
_commCnt++;
Timer timer;
//float factor = (float) 1.0 / _nClients;
int table_id = 0;
if (_isPipeline && _pThread->joinable())
_pThread->join();
@ -185,14 +184,12 @@ namespace Microsoft {
Microsoft::MSR::CNTK::Matrix<ElemType> &mat = node->Value();
#ifndef CPUONLY
//CNTK model -> GPU buffer
//*_pPMatrixCache[_nCacheIdx][i] = mat;
CudaErrorCheck(cudaMemcpy(_pPMatrixCache[_nCacheIdx][i]->BufferPointer(),
mat.BufferPointer(),
mat.GetNumElements() * sizeof(ElemType),
cudaMemcpyDeviceToDevice));
//GPU buffer -> CNTK model
//mat = *_pPMatrixCache[_pCacheState[_nCacheIdx]][i];
CudaErrorCheck(cudaMemcpy(mat.BufferPointer(),
_pPMatrixCache[_pCacheState[_nCacheIdx]][i]->BufferPointer(),
mat.GetNumElements() * sizeof(ElemType),
@ -201,9 +198,9 @@ namespace Microsoft {
ElemType * px = _pPCache[_nCacheIdx] + _vTableIdx[i];
mat.CopyToArray(px, _vTableLength[i]);
ElemType * py = _pPCache[_pCacheState[_nCacheIdx]] + _vTableIdx[i];
mat.SetValue(mat.GetNumRows(), mat.GetNumCols(), mat.GetDeviceId(), py);
@ -234,19 +231,16 @@ namespace Microsoft {
//Calculate delta
transform(_pDelta, _pDelta + _lTotalLength, _pPCache[t_cacheIdx], _pDelta, std::minus<ElemType>());
//transform(_pDelta, _pDelta + _lTotalLength, _pCache, _pDelta, std::minus<ElemType>());
//////Communication
for (int row = 0; row < _nClients; row++)
_adaptor->Add(table_id, row, _pDelta + _pIdxEachServer[row], factor);
//_adaptor->Clock();
_adaptor->BatchLoad(table_id, _pPCache[t_cacheIdx], _pIdxEachServer, _pSizeEachServer);
//CPU buffer -> GPU buffer
for (int widx = 0; widx < _nTableCnt; widx++)
{
ElemType * py = _pPCache[t_cacheIdx] + _vTableIdx[widx];
//ElemType * py = _pCache + _vTableIdx[widx];
CudaErrorCheck(cudaMemcpyAsync(_pPMatrixCache[t_cacheIdx][widx]->BufferPointer(),
py,
@ -314,6 +308,7 @@ namespace Microsoft {
multiverso::SetCommType("p2p");
multiverso::SetSyncType("async");
multiverso::SetLog(true);
int table_id = 0;
//weights
@ -347,7 +342,6 @@ namespace Microsoft {
#ifndef CPUONLY
//pinned memory
//CudaErrorCheck(cudaMallocHost((void **)&_pCache, sizeof(ElemType) * _lTotalLength, cudaHostAllocPortable));
for (int i = 0; i < _nLocalCache; ++i)
CudaErrorCheck(cudaMallocHost((void **)&_pPCache[i], sizeof(ElemType) * (_lTotalLength + 1), cudaHostAllocPortable));
@ -414,7 +408,6 @@ namespace Microsoft {
vector<size_t> _vTableIdx;
ElemType * _pDelta;
ElemType ** _pPCache;
//ElemType *_pCache;
size_t * _pSizeEachServer;
size_t * _pIdxEachServer;