This commit is contained in:
Qiwei Ye 2016-04-19 23:30:18 +08:00
Родитель bc24e174ad
Коммит 51735f98d7
3 изменённых файлов: 85 добавлений и 70 удалений

Просмотреть файл

@ -12,6 +12,7 @@
#include <multiverso/util/net_util.h>
#include <multiverso/util/configure.h>
#include <multiverso/util/timer.h>
#include <multiverso/dashboard.h>
#include <multiverso/table/array_table.h>
#include <multiverso/table/kv_table.h>
@ -444,8 +445,7 @@ void TestSparseMatrixTable(int argc, char* argv[]) {
UpdateOption option;
option.set_worker_id(worker_id);
std::cout << "==> test get twice, the second get should be shorter than the first\
one." << std::endl;
std::cout << "==> test get twice, the second get should be shorter than the first one." << std::endl;
{
auto worker_table = std::shared_ptr<SparseMatrixWorkerTable<int>>(
new SparseMatrixWorkerTable<int>(num_row, num_col));
@ -454,12 +454,12 @@ void TestSparseMatrixTable(int argc, char* argv[]) {
MV_Barrier();
timmer.Restart();
timmer.Start();
worker_table->Get(data, size, worker_id);
std::cout << " " << timmer.elapse() << "s:\t" << "get all rows 1st time" << std::endl;
// do not need to get any rows, since all rows are up-to-date
timmer.Restart();
timmer.Start();
worker_table->Get(data, size, worker_id);
std::cout << " " << timmer.elapse() << "s:\t" << "get all rows 2nd time" << std::endl;
@ -474,7 +474,7 @@ void TestSparseMatrixTable(int argc, char* argv[]) {
new SparseMatrixWorkerTable<int>(num_row, num_col));
auto server_table = std::shared_ptr<SparseMatrixServerTable<int>>(
new SparseMatrixServerTable<int>(num_row, num_col, false));
timmer.Restart();
timmer.Start();
worker_table->Add(delta, size, &option);
worker_table->Get(data, size, -1);
std::cout << " " << timmer.elapse() << "s:\t" << "add 1 to all values, and get all rows after adding" << std::endl;
@ -541,7 +541,7 @@ void TestMatrixPerformance(int argc, char* argv[], bool sparse) {
}
}
}
timmer.Restart();
timmer.Start();
worker_table->Get(data, size, worker_id);
std::cout << " " << timmer.elapse() << "s:\t" << "get all rows after adding to rows" << std::endl;
}
@ -577,13 +577,15 @@ void TestMatrixPerformance(int argc, char* argv[], bool sparse) {
}
}
}
timmer.reset();
timmer.Start();
worker_table->Get(data, size);
timmer.log("get all rows after adding to rows");
std::cout << " " << timmer.elapse() << "s:\t" << "get all rows after adding to rows" << std::endl;
}
}
Log::ResetLogLevel(LogLevel::Info); MV_Dashboard(); Log::ResetLogLevel(LogLevel::Error);
Log::ResetLogLevel(LogLevel::Info);
Dashboard::Display();
Log::ResetLogLevel(LogLevel::Error);
MV_Barrier();
MV_ShutDown();
}
@ -611,6 +613,8 @@ int main(int argc, char* argv[]) {
else if (strcmp(argv[1], "restore") == 0) TestCheckPoint(argc, argv, true);
else if (strcmp(argv[1], "allreduce") == 0) TestAllreduce(argc, argv);
else if (strcmp(argv[1], "sparsematrix") == 0) TestSparseMatrixTable(argc, argv);
else if (strcmp(argv[1], "testsparse0") == 0) TestMatrixPerformance(argc, argv, true);
else if (strcmp(argv[1], "testsparse1") == 0) TestMatrixPerformance(argc, argv, false);
else CHECK(false);
}
return 0;

Просмотреть файл

@ -43,7 +43,10 @@ namespace multiverso {
Blob compressed_blob;
auto compressed = try_compress(blob, &compressed_blob);
// size info (compressed ? size : -1)
#pragma warning( push )
#pragma warning( disable : 4267)
size_blob.As<index_type>(i) = compressed ? blob.size() : -1;
#pragma warning( pop )
outputs->push_back(compressed ? std::move(compressed_blob) : blob);
}
}
@ -67,71 +70,77 @@ namespace multiverso {
}
}
protected:
bool try_compress(const Blob& in_blob,
Blob* out_blob) {
CHECK_NOTNULL(out_blob);
CHECK(sizeof(data_type) == sizeof(index_type));
auto data_count = in_blob.size<data_type>();
auto non_zero_count = 0;
protected:
bool try_compress(const Blob& in_blob,
Blob* out_blob) {
CHECK_NOTNULL(out_blob);
#pragma warning( push )
#pragma warning( disable : 4127)
CHECK(sizeof(data_type) == sizeof(index_type));
#pragma warning( pop )
auto data_count = in_blob.size<data_type>();
auto non_zero_count = 0;
for (auto i = 0; i < data_count; ++i) {
if (std::abs(in_blob.As<data_type>(i)) > clip_value) {
++non_zero_count;
}
}
if (non_zero_count * 2 >= data_count)
return false;
if (non_zero_count == 0) {
// Blob does not support empty content,
// fill the blob with first value
Blob result(2 * sizeof(data_type));
// set index
result.As<index_type>(0) = 0;
// set value
result.As<data_type>(1) = in_blob.As<data_type>(0);
*out_blob = result;
}
else {
Blob result(non_zero_count * 2 * sizeof(data_type));
auto result_index = 0;
for (auto i = 0; i < data_count; ++i) {
if (std::abs(in_blob.As<data_type>(i)) > clip_value) {
++non_zero_count;
auto abs_value = std::abs(in_blob.As<data_type>(i));
if (abs_value > clip_value) {
// set index
result.As<index_type>(result_index++) = i;
// set value
result.As<data_type>(result_index++) =
in_blob.As<data_type>(i);
}
}
if (non_zero_count * 2 >= data_count)
return false;
if (non_zero_count == 0) {
// Blob does not support empty content,
// fill the blob with first value
Blob result(2 * sizeof(data_type));
// set index
result.As<index_type>(0) = 0;
// set value
result.As<data_type>(1) = in_blob.As<data_type>(0);
*out_blob = result;
}
else {
Blob result(non_zero_count * 2 * sizeof(data_type));
auto result_index = 0;
for (auto i = 0; i < data_count; ++i) {
auto abs_value = std::abs(in_blob.As<data_type>(i));
if (abs_value > clip_value) {
// set index
result.As<index_type>(result_index++) = i;
// set value
result.As<data_type>(result_index++) =
in_blob.As<data_type>(i);
}
}
CHECK(result_index == non_zero_count * 2);
*out_blob = result;
}
return true;
CHECK(result_index == non_zero_count * 2);
*out_blob = result;
}
Blob de_compress(const Blob& in_blob, size_t size) {
CHECK(sizeof(data_type) == sizeof(index_type));
CHECK(size % sizeof(data_type) == 0);
auto original_data_count = size / sizeof(data_type);
Blob result(size);
for (auto i = 0; i < original_data_count; ++i) {
result.As<data_type>(i) = 0;
}
auto data_count = in_blob.size<data_type>();
for (auto i = 0; i < data_count; i += 2) {
auto index = in_blob.As<index_type>(i);
auto value = in_blob.As<data_type>(i + 1);
result.As<data_type>(index) = value;
}
return true;
}
return result;
Blob de_compress(const Blob& in_blob, size_t size) {
#pragma warning( push )
#pragma warning( disable : 4127)
CHECK(sizeof(data_type) == sizeof(index_type));
#pragma warning( pop )
CHECK(size % sizeof(data_type) == 0);
auto original_data_count = size / sizeof(data_type);
Blob result(size);
for (auto i = 0; i < original_data_count; ++i) {
result.As<data_type>(i) = 0;
}
auto data_count = in_blob.size<data_type>();
for (auto i = 0; i < data_count; i += 2) {
auto index = in_blob.As<index_type>(i);
auto value = in_blob.As<data_type>(i + 1);
result.As<data_type>(index) = value;
}
return result;
}
private:
double clip_value;
};

Просмотреть файл

@ -10,7 +10,7 @@
namespace multiverso {
const bool split_rows = true;
bool split_rows = true;
// get whole table, data is user-allocated memory
template <typename T>
@ -93,7 +93,7 @@ int SparseMatrixWorkerTable<T>::Partition(const std::vector<Blob>& kv,
}
count.clear();
int offset = 0;
//int offset = 0;
for (int i = 0; i < keys_size; ++i) {
int dst = dest[i];
int rank = MV_ServerIdToRank(dst);
@ -261,8 +261,10 @@ void SparseMatrixServerTable<T>::ProcessGet(
// get worker_id from at the last position
auto workder_id = data[0].As<int>(keys_size);
std::vector<int> outdate_rows;
#pragma warning( push )
#pragma warning( disable : 4267)
update_state_on_get(workder_id, keys, keys_size, &outdate_rows);
#pragma warning( pop )
Blob outdate_rows_blob(sizeof(int) * outdate_rows.size());
for (auto i = 0; i < outdate_rows.size(); ++i) {
outdate_rows_blob.As<int>(i) = outdate_rows[i];