This commit is contained in:
feiga 2016-03-04 21:52:14 +08:00
Родитель 5d548574b3
Коммит 975ef6b46a
8 изменённых файлов: 279 добавлений и 313 удалений

Просмотреть файл

@ -70,10 +70,12 @@
<ClInclude Include="include\multiverso\table\adam_array_table.h"> <ClInclude Include="include\multiverso\table\adam_array_table.h">
<Filter>include\table</Filter> <Filter>include\table</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="include\multiverso\table\matrix_table.h" />
<ClInclude Include="include\multiverso\util\quantization_util.h"> <ClInclude Include="include\multiverso\util\quantization_util.h">
<Filter>util</Filter> <Filter>util</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="include\multiverso\table\matrix_table.h">
<Filter>include\table</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Filter Include="include"> <Filter Include="include">

Просмотреть файл

@ -345,10 +345,10 @@ void TestMatrix(int argc, char* argv[]){
int * data = new int[size]; int * data = new int[size];
worker_table->Add(v, delta.data()); //add row 0,1,5,10 // worker_table->Add(v, delta.data()); //add row 0,1,5,10
worker_table->Add(delta.data()); //add all worker_table->Add(delta.data(), size); //add all
worker_table->Get(data); //get all worker_table->Get(data, size); //get all
MV_Barrier(); MV_Barrier();
printf("----------------------------\n"); printf("----------------------------\n");
@ -360,21 +360,21 @@ void TestMatrix(int argc, char* argv[]){
} }
MV_Barrier(); MV_Barrier();
//test data_vec ////test data_vec
std::vector<int*> data_rows = { &data[0], &data[num_col], &data[5 * num_col], &data[10*num_col] }; //std::vector<int*> data_rows = { &data[0], &data[num_col], &data[5 * num_col], &data[10*num_col] };
std::vector<int*> delta_rows = { &delta[0], &delta[num_col], &delta[5 * num_col], &delta[10 * num_col] }; //std::vector<int*> delta_rows = { &delta[0], &delta[num_col], &delta[5 * num_col], &delta[10 * num_col] };
worker_table->Add(v, delta_rows); //worker_table->Add(v, delta_rows, num_col);
worker_table->Get(v, data_rows); //worker_table->Get(v, data_rows, num_col);
MV_Barrier(); //MV_Barrier();
printf("----------------------------\n"); //printf("----------------------------\n");
for (int i = 0; i < num_row; ++i){ //for (int i = 0; i < num_row; ++i){
printf("rank %d, row %d: ", MV_Rank(), i); // printf("rank %d, row %d: ", MV_Rank(), i);
for (int j = 0; j < num_col; ++j) // for (int j = 0; j < num_col; ++j)
printf("%d ", data[i * num_col + j]); // printf("%d ", data[i * num_col + j]);
printf("\n"); // printf("\n");
} //}
MV_Barrier(); //MV_Barrier();
MV_ShutDown(); MV_ShutDown();
} }

Просмотреть файл

@ -22,6 +22,11 @@ public:
} }
// Construct from external memory. Will copy a new piece // Construct from external memory. Will copy a new piece
Blob(const void* data, size_t size) : size_(size) {
data_.reset(new char[size]);
memcpy(data_.get(), data, size_);
}
Blob(void* data, size_t size) : size_(size) { Blob(void* data, size_t size) : size_(size) {
data_.reset(new char[size]); data_.reset(new char[size]);
memcpy(data_.get(), data, size_); memcpy(data_.get(), data, size_);

Просмотреть файл

@ -32,7 +32,7 @@ int MV_Server_Id();
// Init Multiverso Net with the provided endpoint. Multiverso Net will bind // Init Multiverso Net with the provided endpoint. Multiverso Net will bind
// the provided endpoint and use this endpoint to listen and recv message // the provided endpoint and use this endpoint to listen and recv message
// \param rank the rank of this MV process // \param rank the rank of this MV process
// \param endpoint endpoint with format ip:port, e.g., 127.0.0.1:9999 // \param endpoint endpoint with format ip:port, e.g., localhost:9999
// \return 0 SUCCESS // \return 0 SUCCESS
// \return -1 FAIL // \return -1 FAIL
int MV_Net_Bind(int rank, char* endpoint); int MV_Net_Bind(int rank, char* endpoint);

Просмотреть файл

@ -85,7 +85,7 @@ public:
name().c_str(), rank(), size()); name().c_str(), rank(), size());
} }
void Finalize() override { MPI_Finalize(); } void Finalize() override { inited_ = 0; MPI_Finalize(); }
int Bind(int rank, char* endpoint) override { int Bind(int rank, char* endpoint) override {
Log::Fatal("Shouldn't call this in MPI Net\n"); Log::Fatal("Shouldn't call this in MPI Net\n");
@ -97,6 +97,7 @@ public:
return -1; return -1;
} }
bool active() const { return inited_ != 0; }
int rank() const override { return rank_; } int rank() const override { return rank_; }
int size() const override { return size_; } int size() const override { return size_; }
std::string name() const override { return "MPI"; } std::string name() const override { return "MPI"; }

Просмотреть файл

@ -1,9 +1,9 @@
#ifndef MULTIVERSO_MATRIX_TABLE_H_ #ifndef MULTIVERSO_MATRIX_TABLE_H_
#define MULTIVERSO_MATRIX_TABLE_H_ #define MULTIVERSO_MATRIX_TABLE_H_
#include "multiverso/multiverso.h"
#include "multiverso/table_interface.h" #include "multiverso/table_interface.h"
#include "multiverso/util/log.h" #include "multiverso/util/log.h"
#include "multiverso/zoo.h"
#include <vector> #include <vector>
@ -12,14 +12,12 @@ namespace multiverso {
template <typename T> template <typename T>
class MatrixWorkerTable : public WorkerTable { class MatrixWorkerTable : public WorkerTable {
public: public:
explicit MatrixWorkerTable(int num_row, int num_col) : WorkerTable(), num_row_(num_row), num_col_(num_col) { MatrixWorkerTable(int num_row, int num_col) :
data_ = nullptr; WorkerTable(), num_row_(num_row), num_col_(num_col) {
data_vec_ = nullptr;
row_size_ = num_col * sizeof(T); row_size_ = num_col * sizeof(T);
get_reply_count_ = 0; get_reply_count_ = 0;
num_server_ = Zoo::Get()->num_servers(); num_server_ = MV_Num_Servers();
//compute row offsets in all servers //compute row offsets in all servers
server_offsets_.push_back(0); server_offsets_.push_back(0);
int length = num_row / num_server_; int length = num_row / num_server_;
@ -28,77 +26,58 @@ namespace multiverso {
} }
server_offsets_.push_back(num_row); server_offsets_.push_back(num_row);
Log::Debug("worker %d create matrixTable with %d rows %d colums.\n", Zoo::Get()->rank(), num_row, num_col); Log::Debug("worker %d create matrixTable with %d rows %d colums.\n",
MV_Rank(), num_row, num_col);
} }
T* raw() { return data_; }
std::vector<T*>* row_vec() { return data_vec_; }
// get whole table // get whole table
// data is user-allocated memory // data is user-allocated memory
void Get(T* data){ void Get(T* data, size_t size){
data_ = data; CHECK(size == num_col_ * num_row_);
int whole = -1; int whole_table = -1;
WorkerTable::Get(Blob(&whole, sizeof(int))); Get(whole_table, data, size);
Log::Debug("worker %d getting whole table.\n", Zoo::Get()->rank());
} }
// data is user-allocated memory // data is user-allocated memory
void Get(std::vector<int>& row_ids, T* data) { void Get(int row_id, T* data, size_t size) {
data_ = data; if (row_id >= 0) CHECK(size == num_col_);
WorkerTable::Get(Blob(&row_ids[0], sizeof(int)* row_ids.size())); row_index_[row_id] = data; // data_ = data;
Log::Debug("worker %d getting rows\n", Zoo::Get()->rank()); WorkerTable::Get(Blob(&row_id, sizeof(int)));
Log::Debug("worker %d getting row %d\n", MV_Rank(), row_id);
} }
void Get(std::vector<int>& row_ids, std::vector<T*>& data_vec) { void Get(const std::vector<int>& row_ids,
data_vec_ = &data_vec; const std::vector<T*>& data_vec,
size_t size) {
CHECK(size == num_col_);
CHECK(row_ids.size() == data_vec.size());
for (int i = 0; i < row_ids.size(); ++i){ for (int i = 0; i < row_ids.size(); ++i){
row_index_[row_ids[i]] = i; row_index_[row_ids[i]] = data_vec[i];
} }
WorkerTable::Get(Blob(&row_ids[0], sizeof(int) * row_ids.size())); WorkerTable::Get(Blob(row_ids.data(), sizeof(int) * row_ids.size()));
Log::Debug("worker %d getting rows\n", Zoo::Get()->rank()); Log::Debug("worker %d getting rows, request rows number = %d\n",
MV_Rank(), row_ids.size());
} }
// Add whole table // Add whole table
void Add(T* data) { void Add(T* data, size_t size) {
CHECK_NOTNULL(data); CHECK(size == num_col_ * num_row_);
int whole = -1; int whole_table = -1;
WorkerTable::Add(Blob(&whole, sizeof(int)), Blob(data, row_size_ * num_row_)); Add(whole_table, data, size);
Log::Debug("worker %d adding whole table.\n", Zoo::Get()->rank());
} }
/* void Add(int row_id, T* data, size_t size) {
// Add one row with row_id, add all with row_id = -1 if (row_id >= 0) CHECK(size == num_col_);
void Add(int row_id, std::vector<T*>* data_vec) { Blob ids_blob(&row_id, sizeof(int));
CHECK_NOTNULL(data_vec); Blob data_blob(data, size * sizeof(T));
if (row_id == -1){
Blob whole_table = Blob(&row_id, sizeof(int));
Blob values = Blob(num_row_ * row_size_);
//copy each row
int offset = 0;
for (int i = 0; i < num_row_; ++i){
memcpy(values.data() + offset, (*data_vec)[i], row_size_);
offset += row_size_;
}
WorkerTable::Add(whole_table, values);
}
else{
WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob((*data_vec)[row_id], row_size_));
}
Log::Debug("worker %d adding row with id %d.\n", Zoo::Get()->rank(), row_id);
}
*/
void Add(std::vector<int>& row_ids, T* data) {
Blob ids_blob(&row_ids[0], sizeof(int)* row_ids.size());
Blob data_blob(data, row_ids.size() * row_size_);
WorkerTable::Add(ids_blob, data_blob); WorkerTable::Add(ids_blob, data_blob);
Log::Debug("worker %d adding rows\n", Zoo::Get()->rank()); Log::Debug("worker %d adding rows\n", MV_Rank());
} }
void Add(std::vector<int>& row_ids, const std::vector<T*>& data_vec) { void Add(const std::vector<int>& row_ids,
const std::vector<T*>& data_vec,
size_t size) {
CHECK(size == num_col_);
Blob ids_blob(&row_ids[0], sizeof(int)* row_ids.size()); Blob ids_blob(&row_ids[0], sizeof(int)* row_ids.size());
Blob data_blob(row_ids.size() * row_size_); Blob data_blob(row_ids.size() * row_size_);
//copy each row //copy each row
@ -106,7 +85,7 @@ namespace multiverso {
memcpy(data_blob.data() + i * row_size_, data_vec[i], row_size_); memcpy(data_blob.data() + i * row_size_, data_vec[i], row_size_);
} }
WorkerTable::Add(ids_blob, data_blob); WorkerTable::Add(ids_blob, data_blob);
Log::Debug("worker %d adding rows\n", Zoo::Get()->rank()); Log::Debug("worker %d adding rows\n", MV_Rank());
} }
int Partition(const std::vector<Blob>& kv, int Partition(const std::vector<Blob>& kv,
@ -114,7 +93,6 @@ namespace multiverso {
CHECK(kv.size() == 1 || kv.size() == 2); CHECK(kv.size() == 1 || kv.size() == 2);
CHECK_NOTNULL(out); CHECK_NOTNULL(out);
//get all elements, only happends in data_
if (kv[0].size<int>() == 1 && kv[0].As<int>(0) == -1){ if (kv[0].size<int>() == 1 && kv[0].As<int>(0) == -1){
for (int i = 0; i < num_server_; ++i){ for (int i = 0; i < num_server_; ++i){
(*out)[i].push_back(kv[0]); (*out)[i].push_back(kv[0]);
@ -154,11 +132,13 @@ namespace multiverso {
dst = (dst == num_server_ ? dst - 1 : dst); dst = (dst == num_server_ ? dst - 1 : dst);
(*out)[dst][0].As<int>(count[dst]) = row_ids.As<int>(i); (*out)[dst][0].As<int>(count[dst]) = row_ids.As<int>(i);
if (kv.size() == 2){ // copy add values if (kv.size() == 2){ // copy add values
memcpy(&((*out)[dst][1].As<T>(count[dst] * num_col_)), &(kv[1].As<T>(i * num_col_)), row_size_); memcpy(&((*out)[dst][1].As<T>(count[dst] * num_col_)),
&(kv[1].As<T>(i * num_col_)), row_size_);
} }
++count[dst]; ++count[dst];
} }
if (kv.size() == 1){ if (kv.size() == 1){
CHECK(get_reply_count_ == 0); CHECK(get_reply_count_ == 0);
get_reply_count_ = static_cast<int>(out->size()); get_reply_count_ = static_cast<int>(out->size());
@ -168,75 +148,62 @@ namespace multiverso {
void ProcessReplyGet(std::vector<Blob>& reply_data) override { void ProcessReplyGet(std::vector<Blob>& reply_data) override {
CHECK(reply_data.size() == 2 || reply_data.size() == 3); //3 for get all rows CHECK(reply_data.size() == 2 || reply_data.size() == 3); //3 for get all rows
Blob keys = reply_data[0], data = reply_data[1]; Blob keys = reply_data[0], data = reply_data[1];
//get all rows, only happen in data_ //get all rows, only happen in data_
if (keys.size<int>() == 1 && keys.As<int>(0) == -1) { if (keys.size<int>() == 1 && keys.As<int>() == -1) {
CHECK(data_ != nullptr);
int server_id = reply_data[2].As<int>(); int server_id = reply_data[2].As<int>();
memcpy(data_ + server_offsets_[server_id] * num_col_, data.data(), data.size()); CHECK_NOTNULL(row_index_[-1]);
if ((--get_reply_count_) == 0) data_ = nullptr; //in case of wrong operation to user data memcpy(row_index_[-1] + server_offsets_[server_id] * num_col_,
return; data.data(), data.size());
} }
else {
CHECK(data.size() == keys.size<int>() * row_size_); CHECK(data.size() == keys.size<int>() * row_size_);
int offset = 0; int offset = 0;
if (data_ != nullptr){
for (int i = 0; i < keys.size<int>(); ++i) { for (int i = 0; i < keys.size<int>(); ++i) {
memcpy(data_ + keys.As<int>(i) * num_col_, data.data() + offset, row_size_); CHECK_NOTNULL(row_index_[keys.As<int>(i)]);
memcpy(row_index_[keys.As<int>(i)], data.data() + offset, row_size_);
offset += row_size_; offset += row_size_;
} }
if ((--get_reply_count_) == 0) data_ = nullptr;
}
else { //data_vec_
CHECK(data_vec_ != nullptr);
for (int i = 0; i < keys.size<int>(); ++i) {
memcpy((*data_vec_)[row_index_[keys.As<int>(i)]], data.data() + offset, row_size_);
offset += row_size_;
}
if ((--get_reply_count_) == 0){
data_vec_ = nullptr;
row_index_.clear();
}
} }
//in case of wrong operation to user data
if (--get_reply_count_ == 0) { row_index_.clear(); }
} }
private: private:
T* data_; // not owned // T* data_; // not owned
std::vector<T*>* data_vec_; //not owned // std::vector<T*>* data_vec_; // not owned
std::unordered_map<int, int> row_index_; //index of data with row id in data_vec_ std::unordered_map<int, T*> row_index_; // index of data with row id in data_vec_
int get_reply_count_; // number of unprocessed get reply int get_reply_count_; // number of unprocessed get reply
int num_row_; int num_row_;
int num_col_; int num_col_;
int row_size_; // = sizeof(T) * num_col_ int row_size_; // equals to sizeof(T) * num_col_
int num_server_; int num_server_;
std::vector<int> server_offsets_; // row id offset std::vector<int> server_offsets_; // row id offset
}; };
// TODO(feiga): rename. The name static is inherited from last version
// The storage is a continuous large chunk of memory
template <typename T> template <typename T>
class MatrixServerTable : public ServerTable { class MatrixServerTable : public ServerTable {
public: public:
explicit MatrixServerTable(int num_row, int num_col) : ServerTable(), num_col_(num_col) { explicit MatrixServerTable(int num_row, int num_col) :
server_id_ = Zoo::Get()->rank(); ServerTable(), num_col_(num_col) {
int size = num_row / Zoo::Get()->num_servers(); server_id_ = MV_Server_Id();
row_offset_ = size * Zoo::Get()->rank(); CHECK(server_id_ != -1);
if (server_id_ == Zoo::Get()->num_servers() - 1){
int size = num_row / MV_Num_Servers();
row_offset_ = size * MV_Rank(); // Zoo::Get()->rank();
if (server_id_ == MV_Num_Servers() - 1){
size = num_row - row_offset_; size = num_row - row_offset_;
} }
storage_.resize(size * num_col); storage_.resize(size * num_col);
Log::Debug("server %d create matrixTable with %d row %d colums of %d rows.\n", server_id_, num_row, num_col, size); Log::Debug("server %d create matrix table with %d row %d colums of %d rows.\n",
server_id_, num_row, num_col, size);
} }
void ProcessAdd(const std::vector<Blob>& data) override { void ProcessAdd(const std::vector<Blob>& data) override {
#ifdef MULTIVERSO_USE_BLAS
// MKL update
Log::Fatal("Not implemented yet\n");
#else
CHECK(data.size() == 2); CHECK(data.size() == 2);
Blob values = data[1], keys = data[0]; Blob values = data[1], keys = data[0];
// add all values // add all values
@ -245,11 +212,12 @@ namespace multiverso {
for (int i = 0; i < storage_.size(); ++i){ for (int i = 0; i < storage_.size(); ++i){
storage_[i] += values.As<T>(i); storage_[i] += values.As<T>(i);
} }
Log::Debug("server %d adding all rows with row offset %d with %d rows\n", server_id_, row_offset_, storage_.size() / num_col_); Log::Debug("server %d adding all rows with row offset %d with %d rows\n",
server_id_, row_offset_, storage_.size() / num_col_);
return; return;
} }
CHECK(values.size() == keys.size<int>() * sizeof(T)* num_col_); CHECK(values.size() == keys.size<int>() * sizeof(T)* num_col_);
int offset_v = 0; int offset_v = 0;
for (int i = 0; i < keys.size<int>(); ++i) { for (int i = 0; i < keys.size<int>(); ++i) {
int offset_s = (keys.As<int>(i) -row_offset_) * num_col_; int offset_s = (keys.As<int>(i) -row_offset_) * num_col_;
@ -257,9 +225,9 @@ namespace multiverso {
storage_[j + offset_s] += values.As<T>(offset_v + j); storage_[j + offset_s] += values.As<T>(offset_v + j);
} }
offset_v += num_col_; offset_v += num_col_;
Log::Debug("server %d adding row with id %d\n", server_id_, keys.As<int>(i)); Log::Debug("server %d adding row with id %d\n",
server_id_, keys.As<int>(i));
} }
#endif
} }
void ProcessGet(const std::vector<Blob>& data, void ProcessGet(const std::vector<Blob>& data,
@ -274,11 +242,11 @@ namespace multiverso {
if (keys.size<int>() == 1 && keys.As<int>() == -1){ if (keys.size<int>() == 1 && keys.As<int>() == -1){
result->push_back(Blob(storage_.data(), sizeof(T)* storage_.size())); result->push_back(Blob(storage_.data(), sizeof(T)* storage_.size()));
result->push_back(Blob(&server_id_, sizeof(int))); result->push_back(Blob(&server_id_, sizeof(int)));
Log::Debug("server %d getting all rows with row offset %d with %d rows\n", server_id_, row_offset_, storage_.size() / num_col_); Log::Debug("server %d getting all rows with row offset %d with %d rows\n",
server_id_, row_offset_, storage_.size() / num_col_);
return; return;
} }
result->push_back(Blob(keys.size<int>() * sizeof(T)* num_col_)); result->push_back(Blob(keys.size<int>() * sizeof(T)* num_col_));
Blob& vals = (*result)[1]; Blob& vals = (*result)[1];
int row_size = sizeof(T)* num_col_; int row_size = sizeof(T)* num_col_;

Просмотреть файл

@ -48,6 +48,7 @@ private:
class ServerTable { class ServerTable {
public: public:
ServerTable(); ServerTable();
virtual ~ServerTable() {}
virtual void ProcessAdd(const std::vector<Blob>& data) = 0; virtual void ProcessAdd(const std::vector<Blob>& data) = 0;
virtual void ProcessGet(const std::vector<Blob>& data, virtual void ProcessGet(const std::vector<Blob>& data,
std::vector<Blob>* result) = 0; std::vector<Blob>* result) = 0;

Просмотреть файл

@ -25,19 +25,8 @@ void Zoo::Start(int* argc, char** argv, int role) {
nodes_[rank()].rank = rank(); nodes_[rank()].rank = rank();
nodes_[rank()].role = role; nodes_[rank()].role = role;
mailbox_.reset(new MtQueue<MessagePtr>); mailbox_.reset(new MtQueue<MessagePtr>);
// These actors can either reside in one process, or in different process
// based on the configuration
// For example, we can have a kind of configuration, N machines, each have a
// worker actor(thread), a server actor. Meanwhile, rank 0 node also servers
// as controller.
// We can also have another configuration, N machine, rank 0 acts as
// controller, rank 1...M as workers(M < N), and rank M... N-1 as servers
// All nodes have a communicator, and one(at least one) or more of other three
// kinds of actors
// Log::Debug("Rank %d: Initializing Comunicator.\n", rank()); // NOTE(feiga): the start order is non-trivial, communicator should be last.
// NOTE(feiga): the start order is non-trivial
if (rank() == 0) { Actor* controler = new Controller(); controler->Start(); } if (rank() == 0) { Actor* controler = new Controller(); controler->Start(); }
if (node::is_server(role)) { Actor* server = new Server(); server->Start(); } if (node::is_server(role)) { Actor* server = new Server(); server->Start(); }
if (node::is_worker(role)) { Actor* worker = new Worker(); worker->Start(); } if (node::is_worker(role)) { Actor* worker = new Worker(); worker->Start(); }
@ -72,7 +61,7 @@ void Zoo::Receive(MessagePtr& msg) {
} }
void Zoo::RegisterNode() { void Zoo::RegisterNode() {
MessagePtr msg(new Message()); // = std::make_unique<Message>(); MessagePtr msg(new Message());
msg->set_src(rank()); msg->set_src(rank());
msg->set_dst(0); msg->set_dst(0);
msg->set_type(MsgType::Control_Register); msg->set_type(MsgType::Control_Register);
@ -91,9 +80,9 @@ void Zoo::RegisterNode() {
} }
void Zoo::Barrier() { void Zoo::Barrier() {
MessagePtr msg(new Message()); // = std::make_unique<Message>(); MessagePtr msg(new Message());
msg->set_src(rank()); msg->set_src(rank());
msg->set_dst(0); // rank 0 acts as the controller master. TODO(feiga): msg->set_dst(0); // rank 0 acts as the controller master.
// consider a method to encapsulate this node information // consider a method to encapsulate this node information
msg->set_type(MsgType::Control_Barrier); msg->set_type(MsgType::Control_Barrier);
SendTo(actor::kCommunicator, msg); SendTo(actor::kCommunicator, msg);