This commit is contained in:
Young 2016-05-25 14:24:25 +09:00
Родитель a4ca62e0c7 bbe9e05679
Коммит a282461cca
3 изменённых файлов: 146 добавлений и 203 удалений

Просмотреть файл

@ -5,6 +5,7 @@
#include <ctime>
#include <algorithm>
#include <numeric>
#include <memory>
#include <mpi.h>
@ -24,7 +25,6 @@
#include <multiverso/table_factory.h>
#include <gtest/gtest.h>
#include <memory>
using namespace multiverso;
@ -85,40 +85,36 @@ void TestKV(int argc, char* argv[]) {
void TestArray(int argc, char* argv[]) {
Log::Info("Test Array \n");
multiverso::SetCMDFlag("sync", true);
MV_Init(&argc, argv);
size_t array_size = 50000000;
size_t array_size = 5;
ArrayWorker<float>* shared_array = MV_CreateTable(ArrayTableOption<float>(array_size));
//ArrayWorker<float>* shared_array = new ArrayWorker<float>(50000000);
//ArrayServer<float>* server_array = new ArrayServer<float>(50000000);
ArrayWorker<int>* shared_array = MV_CreateTable(ArrayTableOption<int>(array_size));
MV_Barrier();
Log::Info("Create tables OK\n");
Log::Info("Create tables OK. Rank = %d, worker_id = %d\n", MV_Rank(), MV_WorkerId());
std::vector<float> delta(array_size);
std::vector<int> delta(array_size);
for (int i = 0; i < array_size; ++i)
delta[i] = static_cast<float>(i);
float* data = new float[array_size];
delta[i] = static_cast<int>(i);
int* data = new int[array_size];
int iter = 1000;
int iter = 1000000000;
for (int i = 0; i < iter; ++i) {
// std::vector<float>& vec = shared_array->raw();
// shared_array->Get();
shared_array->Add(delta.data(), array_size);
shared_array->Get(data, array_size);
for (int j = 0; j < 10; ++j)
std::cout << data[j] << " "; std::cout << std::endl;
AddOption option;
option.set_learning_rate(1 - 0.0001 * i);
option.set_momentum(0.99);
option.set_rho(0.01f);
shared_array->Add(delta.data(), array_size, &option);
shared_array->Add(delta.data(), array_size, &option);
for (int k = 0; k < array_size; ++k) {
if (data[k] != delta[k] * (i + 1) * MV_NumWorkers()) {
std::cout << "i + 1 = " << i + 1 << " k = " << k << std::endl;
for (int j = 0; j < array_size; ++j) {
std::cout << data[j] << " ";
}
exit(1);
}
}
if (i % 1000 == 0) { printf("iter = %d\n", i); fflush(stdout); }
}
MV_ShutDown();
}
@ -199,21 +195,99 @@ void TestIP() {
for (auto ip : ip_list) Log::Info("%s\n", ip.c_str());
}
void TestMatrix(int argc, char* argv[]){
Log::Info("Test Matrix\n");
//void TestMatrix(int argc, char* argv[]){
// Log::Info("Test Matrix\n");
//
// Log::ResetLogLevel(LogLevel::Info);
// multiverso::SetCMDFlag("sync", true);
// MV_Init(&argc, argv);
//
// int num_row = 8, num_col = 3592;
// int size = num_row * num_col;
// // MatrixWorkerTable<int>* worker_table = new MatrixWorkerTable<int>(num_row, num_col);
// // MatrixServerTable<int>* server_table = new MatrixServerTable<int>(num_row, num_col);
// MatrixTableOption<int> option;
// option.num_row = num_row;
// option.num_col = num_col;
// MatrixWorkerTable<int>* worker_table = multiverso::MV_CreateTable(option);
// std::thread* m_prefetchThread = nullptr;
// MV_Barrier();
// int count = 0;
// while (true)
// {
// count++;
// std::vector<int> v = { 0, 1, 3, 7 };
//
// // test data
// std::vector<int> delta(size);
// std::vector<int> data(size, 0);
// for (auto i = 0; i < size; ++i)
// delta[i] = (int)i;
//
// worker_table->Add(delta.data(), size);
// worker_table->Get(data.data(), size);
// if (count % 1000 == 0)
// {
// printf("Dense Add/Get, #test: %d.\n", count);
// fflush(stdout);
// }
//
// std::vector<int*> data_rows = { &data[0], &data[num_col], &data[3 * num_col], &data[7 * num_col] };
// std::vector<int*> delta_rows = { &delta[0], &delta[num_col], &delta[3 * num_col], &delta[7 * num_col] };
// worker_table->Add(v, delta_rows, num_col);
// worker_table->Get(v, data_rows, num_col);
// //MV_Barrier();
// //worker_table->Get(v, data_rows, num_col);
//
// if (count % 1000 == 0)
// {
// printf("Sparse Add/Get, #test: %d.\n", count);
// fflush(stdout);
// }
// for (auto i = 0; i < num_row; ++i) {
// for (auto j = 0; j < num_col; ++j) {
// int expected = (int)(i * num_col + j) * count * MV_NumWorkers();
// if (i == 0 || i == 1 || i == 3 || i == 7) {
// expected += (int)(i * num_col + j) * count * MV_NumWorkers();
// }
// int actual = data[i* num_col + j];
// ASSERT_EQ(expected, actual) << "Should be equal after adding, row: "
// << i << ", col:" << j << ", expected: " << expected << ", actual: " << actual;
// }
// }
//
// }
// delete worker_table;
// MV_ShutDown();
////}
Log::ResetLogLevel(LogLevel::Info);
void TestMatrix(int argc, char* argv[]){
// Log::ResetLogLevel(LogLevel::Debug);
multiverso::SetCMDFlag("sync", true);
MV_Init(&argc, argv);
int num_row = 8, num_col = 3592;
int size = num_row * num_col;
// MatrixWorkerTable<int>* worker_table = new MatrixWorkerTable<int>(num_row, num_col);
// MatrixServerTable<int>* server_table = new MatrixServerTable<int>(num_row, num_col);
MatrixTableOption<int> option;
option.num_row = num_row;
option.num_col = num_col;
MatrixWorkerTable<int>* worker_table = multiverso::MV_CreateTable(option);
int num_tables = 5;
std::vector<int> num_table_size;
std::vector<MatrixTableOption<int>* > table_options;
std::vector<MatrixWorkerTable<int>* > worker_tables;
for (auto i = 0; i < num_tables - 1 ; i++)
{
table_options.push_back(new MatrixTableOption<int>());
table_options[i]->num_col = num_col;
table_options[i]->num_row = num_row + i;
num_table_size.push_back(num_col * (num_row + i));
worker_tables.push_back(multiverso::MV_CreateTable(*table_options[i]));
}
table_options.push_back(new MatrixTableOption<int>());
table_options[4]->num_col = num_col;
table_options[4]->num_row = 1;
num_table_size.push_back(num_col * (1));
worker_tables.push_back(multiverso::MV_CreateTable(*table_options[4]));
std::thread* m_prefetchThread = nullptr;
MV_Barrier();
int count = 0;
@ -223,25 +297,36 @@ void TestMatrix(int argc, char* argv[]){
std::vector<int> v = { 0, 1, 3, 7 };
// test data
std::vector<int> delta(size);
std::vector<int> data(size, 0);
for (auto i = 0; i < size; ++i)
delta[i] = (int)i;
std::vector<std::vector<int>> delta(num_tables);
std::vector<std::vector<int>> data(num_tables);
for (auto j =0; j < num_tables; j++)
{
delta[j].resize(num_table_size[j]);
data[j].resize(num_table_size[j], 0);
for (auto i = 0; i < num_table_size[j]; ++i)
delta[j][i] = (int)i;
}
worker_table->Add(delta.data(), size);
worker_table->Get(data.data(), size);
for (auto j = 0; j < num_tables; j++)
{
worker_tables[j]->Add(delta[j].data(), num_table_size[j]);
worker_tables[j]->Get(data[j].data(), num_table_size[j]);
}
if (count % 1000 == 0)
{
printf("Dense Add/Get, #test: %d.\n", count);
fflush(stdout);
}
std::vector<int*> data_rows = { &data[0], &data[num_col], &data[3 * num_col], &data[7 * num_col] };
std::vector<int*> delta_rows = { &delta[0], &delta[num_col], &delta[3 * num_col], &delta[7 * num_col] };
worker_table->Add(v, delta_rows, num_col);
worker_table->Get(v, data_rows, num_col);
MV_Barrier();
worker_table->Get(v, data_rows, num_col);
std::vector<int*> data_rows = { &data[0][0], &data[0][num_col], &data[0][3 * num_col], &data[0][7 * num_col] };
std::vector<int*> delta_rows = { &delta[0][0], &delta[0][num_col], &delta[0][3 * num_col], &delta[0][7 * num_col] };
for (auto j = 0; j < num_tables - 1; j++)
{
worker_tables[j]->Add(v, delta_rows, num_col);
worker_tables[j]->Get(v, data_rows, num_col);
}
//MV_Barrier();
//worker_table->Get(v, data_rows, num_col);
if (count % 1000 == 0)
{
@ -254,18 +339,16 @@ void TestMatrix(int argc, char* argv[]){
if (i == 0 || i == 1 || i == 3 || i == 7) {
expected += (int)(i * num_col + j) * count * MV_NumWorkers();
}
int actual = data[i* num_col + j];
int actual = data[0][i* num_col + j];
ASSERT_EQ(expected, actual) << "Should be equal after adding, row: "
<< i << ", col:" << j << ", expected: " << expected << ", actual: " << actual;
}
}
MV_Barrier();
}
delete worker_table;
worker_tables.clear();
MV_ShutDown();
}
void TestCheckPoint(int argc, char* argv[], bool restore){
Log::Info("Test CheckPoint\n");
@ -326,7 +409,7 @@ void TestmatrixPerformance(int argc, char* argv[],
Log::Info("Test Matrix\n");
Timer timmer;
multiverso::SetCMDFlag("sync", true);
//multiverso::SetCMDFlag("sync", true);
MV_Init(&argc, argv);
int per = 0;
int num_row = 1000000, num_col = 50;

Просмотреть файл

@ -83,8 +83,7 @@ public:
explicit VectorClock(int n) :
local_clock_(n, 0), global_clock_(0), size_(0) {}
static bool except_max_int_compare(int a, int b)
{
static bool except_max_int_compare(int a, int b) {
return (b == std::numeric_limits<int>::max() ? false : a < b);
}
@ -102,7 +101,6 @@ public:
return false;
}
virtual bool FinishTrain(int i) {
local_clock_[i] = std::numeric_limits<int>::max();
if (global_clock_ < *(std::min_element(std::begin(local_clock_),
@ -150,7 +148,7 @@ protected:
CHECK(msg_get_cache_.TryPop(get_msg));
int get_worker = Zoo::Get()->rank_to_worker_id(get_msg->src());
Server::ProcessGet(get_msg);
worker_get_clocks_->Update(get_worker);
CHECK(!worker_get_clocks_->Update(get_worker));
}
}
}
@ -159,7 +157,9 @@ protected:
// 1. Before get: cache faster worker
int worker = Zoo::Get()->rank_to_worker_id(msg->src());
if (worker_add_clocks_->local_clock(worker) >
worker_add_clocks_->global_clock()) {
worker_add_clocks_->global_clock() ||
worker_get_clocks_->local_clock(worker) >
worker_get_clocks_->global_clock()) {
// Will wait for other worker finished Add
msg_get_cache_.Push(msg);
return;
@ -168,21 +168,20 @@ protected:
Server::ProcessGet(msg);
// 3. After get: process cached process add if necessary
if (worker_get_clocks_->Update(worker)) {
CHECK(msg_get_cache_.Empty());
while (!msg_add_cache_.Empty()) {
MessagePtr add_msg;
CHECK(msg_add_cache_.TryPop(add_msg));
int add_worker = Zoo::Get()->rank_to_worker_id(add_msg->src());
Server::ProcessAdd(add_msg);
worker_add_clocks_->Update(add_worker);
CHECK(!worker_add_clocks_->Update(add_worker));
}
}
}
void ProcessFinishTrain(MessagePtr& msg) {
int worker = Zoo::Get()->rank_to_worker_id(msg->src());
Log::Debug("[ProcessFinishTrain] Server %d, worker %d has finished training.\n", Zoo::Get()->server_rank(), worker);
Log::Debug("[ProcessFinishTrain] Server %d, worker %d has finished training.\n",
Zoo::Get()->server_rank(), worker);
if (worker_get_clocks_->FinishTrain(worker)) {
CHECK(msg_get_cache_.Empty());
while (!msg_add_cache_.Empty()) {
@ -193,7 +192,6 @@ protected:
worker_add_clocks_->Update(add_worker);
}
}
if (worker_add_clocks_->FinishTrain(worker)) {
CHECK(msg_add_cache_.Empty());
while (!msg_get_cache_.Empty()) {
@ -214,142 +212,6 @@ private:
MtQueue<MessagePtr> msg_get_cache_;
};
// TODO[qiwye]: Backup logic need to fix
//class WithBackupSyncServer : public Server {
//public:
// WithBackupSyncServer() : Server() {
// num_worker_ = Zoo::Get()->num_workers();
// double backup_ratio = (double)MV_CONFIG_backup_worker_ratio / 100;
// num_sync_worker_ = num_worker_ -
// static_cast<int>(backup_ratio * num_worker_);
// CHECK(num_sync_worker_ > 0 && num_sync_worker_ <= num_worker_);
// if (num_sync_worker_ == num_worker_) {
// Log::Info("No backup worker, using the sync mode\n");
// }
// Log::Info("Sync with backup worker start: num_sync_worker = %d,"
// "num_total_worker = %d\n", num_sync_worker_, num_worker_);
// worker_get_clocks_.reset(new VectorClock(num_worker_));
// worker_add_clocks_.reset(new VectorClock(
// num_worker_, num_worker_ - num_sync_worker_));
// }
//
// // make some modification to suit to the sync server
// // please not use in other place, may different with the general vector clock
// class VectorClock {
// public:
// VectorClock(int num_worker, int num_backup_worker = 0) :
// local_clock_(num_worker, 0), global_clock_(0), num_worker_(num_worker),
// num_sync_worker_(num_worker - num_backup_worker), progress_(0) {}
//
// // Return true when global clock meet the sync condition
// // sync: all worker reach the same clock
// // backup-worker-sync: sync-workers reach the same clock
// virtual bool Update(int i) {
// if (local_clock_[i]++ == global_clock_) {
// ++progress_;
// }
// if (progress_ >= num_sync_worker_) {
// ++global_clock_;
// progress_ = 0;
// for (auto i : local_clock_) {
// if (i > global_clock_) ++progress_;
// }
// if (global_clock_ == *(std::max_element(std::begin(local_clock_),
// std::end(local_clock_)))) {
// return true;
// }
// }
// return false;
// }
//
// std::string DebugString() {
// std::string os = "global ";
// os += std::to_string(global_clock_) + " local: ";
// for (auto i : local_clock_) os += std::to_string(i) + " ";
// return os;
// }
//
// int local_clock(int i) const { return local_clock_[i]; }
// int global_clock() const { return global_clock_; }
//
// protected:
// std::vector<int> local_clock_;
// int global_clock_;
// int num_worker_;
// int num_sync_worker_;
// int progress_;
// };
//protected:
// void ProcessAdd(MessagePtr& msg) override {
// // 1. Before add: cache faster worker
// int worker = Zoo::Get()->rank_to_worker_id(msg->src());
// if (worker_get_clocks_->local_clock(worker) >
// worker_get_clocks_->global_clock()) {
// msg_add_cache_.Push(msg);
// return;
// }
// // 2. Process Add
// if (worker_add_clocks_->local_clock(worker) >=
// worker_add_clocks_->global_clock()) {
// Server::ProcessAdd(msg);
// }
// // 3. After add: process cached process get if necessary
// if (worker_add_clocks_->Update(worker)) {
// CHECK(msg_add_cache_.Empty());
// while (!msg_get_cache_.Empty()) {
// MessagePtr get_msg;
// CHECK(msg_get_cache_.TryPop(get_msg));
// int get_worker = Zoo::Get()->rank_to_worker_id(get_msg->src());
// Server::ProcessGet(get_msg);
// worker_get_clocks_->Update(get_worker);
// }
// }
// }
//
// void ProcessGet(MessagePtr& msg) override {
// // 1. Before get: cache faster worker
// int worker = Zoo::Get()->rank_to_worker_id(msg->src());
// if (worker_add_clocks_->local_clock(worker) >
// worker_add_clocks_->global_clock()) {
// // Will wait for other worker finished Add
// msg_get_cache_.Push(msg);
// return;
// }
// // 2. Process Get
// Server::ProcessGet(msg);
// // 3. After get: process cached process add if necessary
// if (worker_get_clocks_->Update(worker)) {
// CHECK(msg_get_cache_.Empty());
// while (!msg_add_cache_.Empty()) {
// MessagePtr add_msg;
// CHECK(msg_add_cache_.TryPop(add_msg));
// int add_worker = Zoo::Get()->rank_to_worker_id(add_msg->src());
// if (worker_add_clocks_->local_clock(add_worker) >=
// worker_add_clocks_->global_clock()) {
// Server::ProcessAdd(msg);
// };
// worker_add_clocks_->Update(add_worker);
// }
// }
// }
//
// void ProcessFinish(MessagePtr& msg) {
//
// }
//
//private:
// std::unique_ptr<VectorClock> worker_get_clocks_;
// std::unique_ptr<VectorClock> worker_add_clocks_;
//
// MtQueue<MessagePtr> msg_add_cache_;
// MtQueue<MessagePtr> msg_get_cache_;
//
// // num_worker_ - num_sync_worker_ = num_backup_worker_
// int num_sync_worker_;
// int num_worker_;
//};
Server* Server::GetServer() {
if (!MV_CONFIG_sync) {
Log::Info("Create a async server\n");

Просмотреть файл

@ -61,17 +61,15 @@ void Worker::ProcessAdd(MessagePtr& msg) {
int num = cache_[table_id]->Partition(msg->data(), &partitioned_kv);
cache_[table_id]->Reset(msg_id, num);
for (auto i = 0; i < Zoo::Get()->num_servers(); i++) {
//for (auto& it : partitioned_kv) {
int dst_rank = Zoo::Get()->server_id_to_rank(i);
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
//msg->set_dst(it.first);
msg->set_dst(i);
msg->set_dst(dst_rank);
msg->set_type(MsgType::Request_Add);
msg->set_msg_id(msg_id);
msg->set_table_id(table_id);
//msg->set_data(it.second);
if (partitioned_kv.find(i) != partitioned_kv.end())
msg->set_data(partitioned_kv[i]);
if (partitioned_kv.find(dst_rank) != partitioned_kv.end())
msg->set_data(partitioned_kv[dst_rank]);
SendTo(actor::kCommunicator, msg);
}
MONITOR_END(WORKER_PROCESS_ADD)