diff --git a/next/Test/main.cpp b/next/Test/main.cpp index 918cfa2..3b89d67 100644 --- a/next/Test/main.cpp +++ b/next/Test/main.cpp @@ -18,41 +18,6 @@ using namespace multiverso; -void TestMatrix(int argc, char* argv[]){ - Log::Info("Test Matrix\n"); - MV_Init(&argc, argv); - MatrixWorkerTable* worker_table = new MatrixWorkerTable(10, 10); - MatrixServerTable* server_table = new MatrixServerTable(10, 10); - MV_Barrier(); - - std::vector delta(10 * 10); - - for (int i = 0; i < 100; ++i) - delta[i] = 0; - - int * data = new int[10 * 10]; - - for (int i = 0; i < 100; ++i) - delta[i] = 1.0; - - std::vector v = { 0, 1, 5 }; - - worker_table->Add(v, delta.data()); - worker_table->Add(-1, delta.data()); - - worker_table->Get(-1, data); - - printf("-----------rank %d begin-----------\n", Zoo::Get()->rank()); - for (int i = 0; i < 10; ++i){ - for (int j = 0; j < 10; ++j) - printf("%d ", data[i*10+j]); - printf("\n"); - } - MV_Barrier(); - - MV_ShutDown(); -} - void TestKV(int argc, char* argv[]) { Log::Info("Test KV map \n"); // ----------------------------------------------------------------------- // @@ -358,6 +323,69 @@ void TestNoNet(int argc, char* argv[]) { MV_ShutDown(); } +void TestMatrix(int argc, char* argv[]){ + Log::Info("Test Matrix\n"); + + MV_Init(&argc, argv); + + int num_row = 11, num_col = 10; + int size = num_row * num_col; + + MatrixWorkerTable* worker_table = new MatrixWorkerTable(num_row, num_col); + MatrixServerTable* server_table = new MatrixServerTable(num_row, num_col); + + MV_Barrier(); + + std::vector v = { 0, 1, 5 }; + /* test data + std::vector delta(size); + for (int i = 0; i < size; ++i) + delta[i] = 1; + + int * data = new int[size]; + + worker_table->Add(v, delta.data()); + worker_table->Add(-1, delta.data()); + + worker_table->Get(-1, data); + MV_Barrier(); + + printf("----------------------------\n"); + for (int i = 0; i < num_row; ++i){ + printf("rank %d output row %d: ", Zoo::Get()->rank(), i); + for (int j = 0; j < num_col; ++j) + printf("%d ", data[i * num_col + j]); + printf("\n"); + } + */ + + //test data_vec + std::vector delta(num_row); + std::vector data(num_row); + for (int i = 0; i < num_row; ++i){ + delta[i] = new int[num_col]; + data[i] = new int[num_col]; + for (int j = 0; j < num_col; ++j){ + delta[i][j] = 1; + } + } + + worker_table->Add(v, &delta); + worker_table->Add(-1, &delta); + worker_table->Get(10, &data); + MV_Barrier(); + + printf("----------------------------\n"); + for (int i = 0; i < num_row; ++i){ + printf("rank %d output row %d: ", Zoo::Get()->rank(), i); + for (int j = 0; j < num_col; ++j) + printf("%d ", data[i][j]); + printf("\n"); + } + + MV_ShutDown(); +} + void TestComm(int argc, char* argv[]) { diff --git a/next/include/multiverso/table/matrix_table.h b/next/include/multiverso/table/matrix_table.h index 66ce3c8..85044b7 100644 --- a/next/include/multiverso/table/matrix_table.h +++ b/next/include/multiverso/table/matrix_table.h @@ -13,11 +13,17 @@ namespace multiverso { class MatrixWorkerTable : public WorkerTable { public: explicit MatrixWorkerTable(int num_row, int num_col) : WorkerTable(), num_row_(num_row), num_col_(num_col) { + data_ = nullptr; + data_vec_ = nullptr; + + row_size_ = num_col * sizeof(T); + num_server_ = Zoo::Get()->num_servers(); + //compute row offsets in all servers server_offsets_.push_back(0); int length = num_row / num_server_; for (int i = 1; i < num_server_; ++i) { - server_offsets_.push_back(i * length); // may not balance + server_offsets_.push_back(i * length); } server_offsets_.push_back(num_row); @@ -26,6 +32,9 @@ namespace multiverso { T* raw() { return data_; } + std::vector* row_vec() { return data_vec_; } + + // get one row, -1 for all // data is user-allocated memory void Get(int row_id, T* data){ data_ = data; @@ -33,14 +42,12 @@ namespace multiverso { Log::Debug("worker %d getting row with id %d.\n", Zoo::Get()->rank(), row_id); } - void Add(int row_id, T* data) { - if (row_id == -1){ - WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob(data, sizeof(T) * num_col_ * num_row_)); - } - else{ - WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob(data, sizeof(T)* num_col_)); - } - Log::Debug("worker %d adding row with id %d.\n", Zoo::Get()->rank(), row_id); + // get one row, -1 for all + // data_vec is user-allocated memory + void Get(int row_id, std::vector* data_vec){ + data_vec_ = data_vec; + WorkerTable::Get(Blob(&row_id, sizeof(int))); + Log::Debug("worker %d getting row with id %d.\n", Zoo::Get()->rank(), row_id); } // data is user-allocated memory @@ -50,10 +57,60 @@ namespace multiverso { Log::Debug("worker %d getting rows\n", Zoo::Get()->rank()); } - // Add some rows + void Get(std::vector row_ids, std::vector* data_vec) { + data_vec_ = data_vec; + WorkerTable::Get(Blob(&row_ids[0], sizeof(int)* row_ids.size())); + Log::Debug("worker %d getting rows\n", Zoo::Get()->rank()); + } + + // Add one row with row_id, add all with row_id = -1 + void Add(int row_id, T* data) { + CHECK_NOTNULL(data); + + if (row_id == -1){ + WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob(data, row_size_ * num_row_)); + } + else{ + WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob(data, row_size_)); + } + Log::Debug("worker %d adding row with id %d.\n", Zoo::Get()->rank(), row_id); + } + + // Add one row with row_id, add all with row_id = -1 + void Add(int row_id, std::vector* data_vec) { + CHECK_NOTNULL(data_vec); + + if (row_id == -1){ + Blob whole_table = Blob(&row_id, sizeof(int)); + Blob values = Blob(num_row_ * row_size_); + //copy each row + int offset = 0; + for (int i = 0; i < num_row_; ++i){ + memcpy(values.data() + offset, (*data_vec)[i], row_size_); + offset += row_size_; + } + WorkerTable::Add(whole_table, values); + } + else{ + WorkerTable::Add(Blob(&row_id, sizeof(int)), Blob((*data_vec)[row_id], row_size_)); + } + Log::Debug("worker %d adding row with id %d.\n", Zoo::Get()->rank(), row_id); + } + void Add(std::vector row_ids, T* data) { Blob ids_blob(&row_ids[0], sizeof(int)* row_ids.size()); - Blob data_blob(data, sizeof(T)* row_ids.size() * num_col_); + Blob data_blob(data, row_ids.size() * row_size_); + WorkerTable::Add(ids_blob, data_blob); + Log::Debug("worker %d adding rows\n", Zoo::Get()->rank()); + } + + void Add(std::vector row_ids, std::vector* data_vec) { + Blob ids_blob(&row_ids[0], sizeof(int)* row_ids.size()); + Blob data_blob(row_ids.size() * row_size_); + //copy each row + for (int i = 0; i < row_ids.size(); ++i){ + memcpy(data_blob.data() + i * row_size_, (*data_vec)[i], row_size_); + } WorkerTable::Add(ids_blob, data_blob); Log::Debug("worker %d adding rows\n", Zoo::Get()->rank()); } @@ -68,35 +125,36 @@ namespace multiverso { for (int i = 0; i < num_server_; ++i){ (*out)[i].push_back(kv[0]); } - if (kv.size() == 2){ + if (kv.size() == 2){ //process add values for (int i = 0; i < num_server_; ++i){ - Blob blob(kv[1].data() + server_offsets_[i] * num_col_ * sizeof(T), - (server_offsets_[i + 1] - server_offsets_[i]) * num_col_ * sizeof(T)); + Blob blob(kv[1].data() + server_offsets_[i] * row_size_, + (server_offsets_[i + 1] - server_offsets_[i]) * row_size_); (*out)[i].push_back(blob); } } return static_cast(out->size()); } - + //count row number in each server Blob row_ids = kv[0]; std::unordered_map count; + int num_row_each = num_row_ / num_server_; for (int i = 0; i < row_ids.size(); ++i){ - int dst = row_ids.As(i) / (num_row_ / num_server_); + int dst = row_ids.As(i) / num_row_each; dst = (dst == num_server_ ? dst - 1 : dst); ++count[dst]; } for (auto& it : count) { // Allocate memory std::vector& vec = (*out)[it.first]; vec.push_back(Blob(it.second * sizeof(int))); - if (kv.size() == 2) vec.push_back(Blob(it.second * sizeof(T)* num_col_)); + if (kv.size() == 2) vec.push_back(Blob(it.second * row_size_)); } count.clear(); for (int i = 0; i < row_ids.size(); ++i) { - int dst = row_ids.As(i) / (num_row_ / num_server_); + int dst = row_ids.As(i) / num_row_each; dst = (dst == num_server_ ? dst - 1 : dst); (*out)[dst][0].As(count[dst]) = row_ids.As(i); - if (kv.size() == 2){ - memcpy(&((*out)[dst][1].As(count[dst] * num_col_)), &(kv[1].As(i * num_col_)), num_col_ * sizeof(T)); + if (kv.size() == 2){//copy add values + memcpy(&((*out)[dst][1].As(count[dst] * num_col_)), &(kv[1].As(i * num_col_)), row_size_); } ++count[dst]; } @@ -104,28 +162,51 @@ namespace multiverso { } void ProcessReplyGet(std::vector& reply_data) override { - CHECK(reply_data.size() == 2 || reply_data.size() == 3); - Blob keys = reply_data[0], data = reply_data[1]; + CHECK(reply_data.size() == 2 || reply_data.size() == 3);//3 for get all rows + Blob keys = reply_data[0], data = reply_data[1]; + //get all rows if (keys.size() == 1 && keys.As(0) == -1) { - int row_offset = reply_data[2].As(); - memcpy(data_ + row_offset * num_col_, data.data(), data.size()); + int server_id = reply_data[2].As(); + if (data_ != nullptr){ //copy into data_ + memcpy(data_ + server_offsets_[server_id] * num_col_, data.data(), data.size()); + } + else { //into data vector + int offset_d = 0; + int offset_v = server_offsets_[server_id]; + int num_row = server_offsets_[server_id + 1] - offset_v; + for (int i = 0; i < num_row; ++i){ + memcpy((*data_vec_)[offset_v + i], data.data() + offset_d, row_size_); + offset_d += row_size_; + } + } return; } CHECK(data.size() == keys.size() * sizeof(T)* num_col_); - for (int i = 0; i < keys.size(); ++i) { - int row_id = keys.As(i); - memcpy(data_ + row_id * num_col_, data.data() + i * num_col_ * sizeof(T), num_col_ * sizeof(T)); + int offset = 0; + if (data_ != nullptr){ + for (int i = 0; i < keys.size(); ++i) { + memcpy(data_ + keys.As(i) * num_col_, data.data() + offset, row_size_); + offset += row_size_; + } + } + else { + for (int i = 0; i < keys.size(); ++i) { + memcpy((*data_vec_)[keys.As(i)], data.data() + offset, row_size_); + offset += row_size_; + } } } private: T* data_; // not owned + std::vector* data_vec_; //not owned int num_row_; int num_col_; + int row_size_; int num_server_; - std::vector server_offsets_; + std::vector server_offsets_; }; // TODO(feiga): rename. The name static is inherited from last version @@ -153,7 +234,7 @@ namespace multiverso { #else CHECK(data.size() == 2); Blob values = data[1], keys = data[0]; - + // add all values if (keys.size() == 1 && keys.As() == -1){ CHECK(storage_.size() == values.size()); for (int i = 0; i < storage_.size(); ++i){ @@ -164,12 +245,13 @@ namespace multiverso { } CHECK(values.size() == keys.size() * sizeof(T)* num_col_); + int offset_v = 0; for (int i = 0; i < keys.size(); ++i) { - int offset_v = i * num_col_; int offset_s = (keys.As(i) -row_offset_) * num_col_; for (int j = 0; j < num_col_; ++j){ storage_[j + offset_s] += values.As(offset_v + j); } + offset_v += num_col_; Log::Debug("server %d adding row with id %d\n", server_id_, keys.As(i)); } #endif @@ -179,14 +261,14 @@ namespace multiverso { std::vector* result) override { CHECK(data.size() == 1); CHECK_NOTNULL(result); - + Blob keys = data[0]; result->push_back(keys); // also push the key //get all rows if (keys.size() == 1 && keys.As() == -1){ result->push_back(Blob(storage_.data(), sizeof(T)* storage_.size())); - result->push_back(Blob(&row_offset_, sizeof(int))); + result->push_back(Blob(&server_id_, sizeof(int))); Log::Debug("server %d getting all rows with row offset %d with %d rows\n", server_id_, row_offset_, storage_.size() / num_col_); return; } @@ -194,10 +276,12 @@ namespace multiverso { result->push_back(Blob(keys.size() * sizeof(T)* num_col_)); Blob& vals = (*result)[1]; + int row_size = sizeof(T)* num_col_; + int offset_v = 0; for (int i = 0; i < keys.size(); ++i) { - int offset_v = i * num_col_; int offset_s = (keys.As(i) -row_offset_) * num_col_; - memcpy(&(vals.As(offset_v)), &storage_[offset_s], sizeof(T)*num_col_); + memcpy(&(vals.As(offset_v)), &storage_[offset_s], row_size); + offset_v += num_col_; Log::Debug("server %d getting row with id %d\n", server_id_, keys.As(i)); } }