Merge branch 'feature/sparse-update' of https://github.com/Microsoft/multiverso into feature/sparse-update

Conflicts:
	Test/main.cpp
This commit is contained in:
Qiwei Ye 2016-04-21 12:38:54 +08:00
Родитель 8a20a9eaa4 bbec7359d9
Коммит bd9e4f4b70
1 изменённых файлов: 87 добавлений и 86 удалений

Просмотреть файл

@ -489,9 +489,16 @@ void TestSparseMatrixTable(int argc, char* argv[]) {
}
void TestMatrixPerformance(int argc, char* argv[], bool sparse = true) {
Log::ResetLogLevel(LogLevel::Error);
Log::Info("Test Sparse Matrix\n");
template<typename WT, typename ST>
void TestmatrixPerformance(int argc, char* argv[],
std::function<std::shared_ptr<WT>(int num_row, int num_col)>CreateWorkerTable,
std::function<std::shared_ptr<ST>(int num_row, int num_col)>CreateServerTable,
std::function<void(const std::shared_ptr<WT>& worker_table, const std::vector<int>& row_ids, const std::vector<int*>& data_vec, size_t size, const UpdateOption* option, int worker_id)> Add,
std::function<void(const std::shared_ptr<WT>& worker_table, int* data, size_t size, int worker_id)> Get) {
Log::ResetLogLevel(LogLevel::Info);
Log::Info("Test Matrix\n");
Timer timmer;
MV_Init(&argc, argv);
multiverso::SetCMDFlag("sync", true);
@ -503,7 +510,7 @@ void TestMatrixPerformance(int argc, char* argv[], bool sparse = true) {
for (int i = 3; i < 100; i++) // asuming 100
{
bool prime = true;
for (int j = 0; j<primes.size() && primes[j] * primes[j] <= i; j++)
for (int j = 0; j < primes.size() && primes[j] * primes[j] <= i; j++)
{
if (i % primes[j] == 0)
{
@ -527,11 +534,11 @@ void TestMatrixPerformance(int argc, char* argv[], bool sparse = true) {
int* keys = new int[num_row];
for (auto row = 0; row < num_row; ++row) {
for (auto col = 0; col < num_col; ++col) {
delta[row * num_col + col] = row + 2;
}
delta[row * num_col + col] = row * num_col + col;
}
}
UpdateOption option;
option.set_worker_id(worker_id);
@ -540,92 +547,87 @@ void TestMatrixPerformance(int argc, char* argv[], bool sparse = true) {
for (int i = 0; i < num_row; i++){
unique_index.push_back(i);
}
Timer timmer;
if (sparse) {
for (auto p = 0; p < 10; ++p)
for (auto p = 0; p < 10; ++p)
{
std::shuffle(unique_index.begin(), unique_index.end(), eng);
std::cout << "==> test add " << p + 1 << " /10 rows to matrix server" << std::endl;
auto worker_table = CreateWorkerTable(num_row, num_col);
auto server_table = CreateServerTable(num_row, num_col);
std::vector<int> row_ids;
std::vector<float*> data_vec;
for (auto i = 0; i < (p + 1) * num_row / 10; i++)
{
std::shuffle(unique_index.begin(), unique_index.end(), eng);
std::cout << "==> test add " << p + 1 << " /10 rows to *sparse* matrix server" << std::endl;
auto worker_table = std::shared_ptr<SparseMatrixWorkerTable<float>>(
new SparseMatrixWorkerTable<float>(num_row, num_col));
auto server_table = std::shared_ptr<SparseMatrixServerTable<float>>(
new SparseMatrixServerTable<float>(num_row, num_col, false));
std::vector<int> row_ids;
std::vector<float*> data_vec;
for (auto i = 0; i < (p+1) * num_row / 10; i++)
{
row_ids.push_back(unique_index[i]);
data_vec.push_back(delta + unique_index[i] * num_col);
}
std::cout << "row_ids : size " << row_ids.size() << std::endl;
worker_table->Add(row_ids, data_vec, num_col, &option);
worker_table->Get(data, size, -1);
for (auto i = 0; i < num_row; ++i) {
auto row_start = data + i * num_col;
for (auto col = 0; col < num_col; ++col) {
if (i % 10 <= p) {
auto expected = i + 2;
auto actual = *(row_start + col);
ASSERT_EQ(expected, actual) << "Should be updated after adding";
}
else {
ASSERT_EQ(0, *(row_start + col)) << "Should be 0 for non update row values";
}
row_ids.push_back(unique_index[i]);
data_vec.push_back(delta + unique_index[i] * num_col);
}
Add(worker_table, row_ids, data_vec, num_col, &option, worker_id);
Get(worker_table, data, size, -1);
for (auto i = 0; i < num_row; ++i) {
auto row_start = data + i * num_col;
for (auto col = 0; col < num_col; ++col) {
if (i % 10 <= p) {
auto expected = i * num_col + col;
auto actual = *(row_start + col);
// ASSERT_EQ(expected, actual) << "Should be updated after adding";
}
else {
// ASSERT_EQ(0, *(row_start + col)) << "Should be 0 for non update row values";
}
}
timmer.Start();
worker_table->Get(data, size, worker_id);
std::cout << " " << 1.0 * timmer.elapse() / 1000 << "s:\t" << "get all rows after adding to rows" << std::endl;
}
timmer.Start();
Get(worker_table, data, size, worker_id);
std::cout << " " << 1.0 * timmer.elapse() / 1000 << "s:\t" << "get all rows after adding to rows" << std::endl;
}
else {
for (auto p = 0; p < 10; ++p)
{
std::shuffle(unique_index.begin(), unique_index.end(), eng);
std::cout << "==> test add " << p + 1 << " /10 rows to matrix server" << std::endl;
auto worker_table = std::shared_ptr<MatrixWorkerTable<float>>(
new MatrixWorkerTable<float>(num_row, num_col));
auto server_table = std::shared_ptr<MatrixServerTable<float>>(
new MatrixServerTable<float>(num_row, num_col));
std::vector<int> row_ids;
std::vector<float*> data_vec;
// update (p+1)/10 rows with 1
for (auto i = 0; i < (p + 1) * num_row / 10; i++)
{
row_ids.push_back(unique_index[i]);
data_vec.push_back(delta + unique_index[i] * num_col);
}
std::cout << "row_ids : size " << row_ids.size() << std::endl;
worker_table->Add(row_ids, data_vec, num_col, &option);
worker_table->Get(data, size);
for (auto i = 0; i < num_row; ++i) {
auto row_start = data + i * num_col;
for (auto col = 0; col < num_col; ++col) {
if (i % 10 <= p) {
auto expected = i + 2;
auto actual = *(row_start + col);
ASSERT_EQ(expected, actual) << "Should be updated after adding";
}
else {
ASSERT_EQ(0, *(row_start + col)) << "Should be 0 for non update row values";
}
}
}
timmer.Start();
worker_table->Get(data, size);
std::cout << " " << 1.0 * timmer.elapse() / 1000 << "s:\t" << "get all rows after adding to rows" << std::endl;
}
}
Log::ResetLogLevel(LogLevel::Info);
Dashboard::Display();
Log::ResetLogLevel(LogLevel::Error);
MV_Barrier();
Log::ResetLogLevel(LogLevel::Info);
Dashboard::Display();
Log::ResetLogLevel(LogLevel::Error);
MV_ShutDown();
}
void TestSparsePerf(int argc, char* argv[]) {
TestmatrixPerformance<SparseMatrixWorkerTable<int>, SparseMatrixServerTable<int>>(argc,
argv,
[](int num_row, int num_col) {
return std::shared_ptr<SparseMatrixWorkerTable<int>>(
new SparseMatrixWorkerTable<int>(num_row, num_col));
},
[](int num_row, int num_col) {
return std::shared_ptr<SparseMatrixServerTable<int>>(
new SparseMatrixServerTable<int>(num_row, num_col, false));
},
[](const std::shared_ptr<SparseMatrixWorkerTable<int>>& worker_table, const std::vector<int>& row_ids, const std::vector<int*>& data_vec, size_t size, const UpdateOption* option, const int worker_id) {
worker_table->Add(row_ids, data_vec, size, option);
},
[](const std::shared_ptr<SparseMatrixWorkerTable<int>>& worker_table, int* data, size_t size, int worker_id) {
worker_table->Get(data, size, worker_id);
});
}
void TestDensePerf(int argc, char* argv[]) {
TestmatrixPerformance<MatrixWorkerTable<int>, MatrixServerTable<int>>(argc,
argv,
[](int num_row, int num_col) {
return std::shared_ptr<MatrixWorkerTable<int>>(
new MatrixWorkerTable<int>(num_row, num_col));
},
[](int num_row, int num_col) {
return std::shared_ptr<MatrixServerTable<int>>(
new MatrixServerTable<int>(num_row, num_col));
},
[](const std::shared_ptr<MatrixWorkerTable<int>>& worker_table, const std::vector<int>& row_ids, const std::vector<int*>& data_vec, size_t size, const UpdateOption* option, const int worker_id) {
worker_table->Add(row_ids, data_vec, size, option);
},
[](const std::shared_ptr<MatrixWorkerTable<int>>& worker_table, int* data, size_t size, int worker_id) {
worker_table->Get(data, size);
});
}
int main(int argc, char* argv[]) {
Log::ResetLogLevel(LogLevel::Debug);
@ -648,9 +650,8 @@ int main(int argc, char* argv[]) {
else if (strcmp(argv[1], "checkpoint") == 0) TestCheckPoint(argc, argv, false);
else if (strcmp(argv[1], "restore") == 0) TestCheckPoint(argc, argv, true);
else if (strcmp(argv[1], "allreduce") == 0) TestAllreduce(argc, argv);
else if (strcmp(argv[1], "sparsematrix") == 0) TestSparseMatrixTable(argc, argv);
else if (strcmp(argv[1], "perf-sparse") == 0) TestMatrixPerformance(argc, argv);
else if (strcmp(argv[1], "perf-dense") == 0) TestMatrixPerformance(argc, argv,false);
else if (strcmp(argv[1], "TestSparsePerf") == 0) TestSparsePerf(argc, argv);
else if (strcmp(argv[1], "TestDensePerf") == 0) TestDensePerf(argc, argv);
else CHECK(false);
}
return 0;