diff --git a/next/IMultiverso.vcxproj b/next/IMultiverso.vcxproj index f9345e6..50beb2d 100644 --- a/next/IMultiverso.vcxproj +++ b/next/IMultiverso.vcxproj @@ -192,7 +192,7 @@ - + diff --git a/next/IMultiverso.vcxproj.filters b/next/IMultiverso.vcxproj.filters index 9a78ae8..206782b 100644 --- a/next/IMultiverso.vcxproj.filters +++ b/next/IMultiverso.vcxproj.filters @@ -100,9 +100,6 @@ updater - - updater - updater @@ -115,6 +112,9 @@ updater + + updater + diff --git a/next/Test/main.cpp b/next/Test/main.cpp index 90b00bb..2f2aafa 100644 --- a/next/Test/main.cpp +++ b/next/Test/main.cpp @@ -102,6 +102,7 @@ void TestArray(int argc, char* argv[]) { UpdateOption option; option.set_learning_rate(1 - 0.0001 * i); option.set_momentum(0.99); + option.set_rho(0.01f); shared_array->Add(delta.data(), 1000000, &option); @@ -343,8 +344,8 @@ void TestMatrix(int argc, char* argv[]){ // Log::Debug("rank %d has no worker\n", MV_Rank()); // } - MatrixWorkerTable* worker_table = new MatrixWorkerTable(num_row, num_col); - MatrixServerTable* server_table = new MatrixServerTable(num_row, num_col); + MatrixWorkerTable* worker_table = new MatrixWorkerTable(num_row, num_col); + MatrixServerTable* server_table = new MatrixServerTable(num_row, num_col); std::thread* m_prefetchThread = nullptr; MV_Barrier(); @@ -359,21 +360,22 @@ void TestMatrix(int argc, char* argv[]){ std::vector v = { 0, 1, 5, 10 }; // test data - std::vector delta(size); + std::vector delta(size); for (int i = 0; i < size; ++i) delta[i] = i; - int * data = new int[size]; + float * data = new float[size]; m_prefetchThread = new std::thread([&](){ - worker_table->Add(delta.data(), size); //add all + UpdateOption option; + worker_table->Add(delta.data(), size, &option); //add all worker_table->Get(data, size); //get all printf("----------------------------\n"); for (int i = 0; i < num_row; ++i){ printf("rank %d, row %d: ", MV_Rank(), i); for (int j = 0; j < num_col; ++j) - printf("%d ", data[i * num_col + j]); + printf("%.2f ", data[i * num_col + j]); printf("\n"); }; }); @@ -386,9 +388,10 @@ void TestMatrix(int argc, char* argv[]){ m_prefetchThread = nullptr; } //test data_vec - std::vector data_rows = { &data[0], &data[num_col], &data[5 * num_col], &data[10 * num_col] }; - std::vector delta_rows = { &delta[0], &delta[num_col], &delta[5 * num_col], &delta[10 * num_col] }; - worker_table->Add(v, delta_rows, num_col); + std::vector data_rows = { &data[0], &data[num_col], &data[5 * num_col], &data[10 * num_col] }; + std::vector delta_rows = { &delta[0], &delta[num_col], &delta[5 * num_col], &delta[10 * num_col] }; + UpdateOption option; + worker_table->Add(v, delta_rows, num_col, &option); worker_table->Get(v, data_rows, num_col); MV_Barrier(); @@ -396,7 +399,7 @@ void TestMatrix(int argc, char* argv[]){ for (int i = 0; i < num_row; ++i){ printf("rank %d, row %d: ", MV_Rank(), i); for (int j = 0; j < num_col; ++j) - printf("%d ", data[i * num_col + j]); + printf("%.2f ", data[i * num_col + j]); printf("\n"); } MV_Barrier(); diff --git a/next/include/multiverso/updater/adagrad_updater.h b/next/include/multiverso/updater/adagrad_updater.h index 3ae5029..77ed15d 100644 --- a/next/include/multiverso/updater/adagrad_updater.h +++ b/next/include/multiverso/updater/adagrad_updater.h @@ -5,6 +5,8 @@ #include #include +#include + namespace multiverso { @@ -13,25 +15,41 @@ class AdaGradUpdater : public Updater { public: explicit AdaGradUpdater(size_t size): e(1e-6f), size_(size) { - historic_g_sqr_.resize(MV_NumWorkers()); - for (auto s : historic_g_sqr_){ - s.resize(size_); - } - Log::Debug("[AdaGradUpdater] Init with size = %d, e = %f. \n", size_, e); + historic_g_sqr_.resize(MV_NumWorkers(), std::vector(size_)); + Log::Debug("[AdaGradUpdater] Init with size = %d, e = %f. historic_size = %d\n", size_, e, historic_g_sqr_.size()); } + void Update(size_t num_element, T* data, T* delta, UpdateOption* option, size_t offset) override { + auto g_sqr_data_ = historic_g_sqr_.at(option->worker_id()); for (size_t index = 0; index < num_element; ++index) { - - historic_g_sqr_[option->worker_id()][index + offset] -= + g_sqr_data_[index + offset] -= delta[index] * delta[index] / option->learning_rate() / option->learning_rate(); + //[TODO(qiwye)] sqrt take too much time data[index + offset] -= option->rho() / - std::sqrt(historic_g_sqr_[option->worker_id()][index + offset] + e) * + std::sqrt(g_sqr_data_[index + offset] + e) * delta[index] / option->learning_rate(); + + //data[index + offset] -= option->rho() * + // QuakeRsqrt(g_sqr_data_[index + offset] + e) * + // delta[index] / option->learning_rate(); } } + + +private: + + float QuakeRsqrt(float number){ + float x = number * 0.5f, y = number; + std::uint32_t i; + std::memcpy(&i, &y, sizeof(float)); + i = 0x5f3759df - (i >> 1); + std::memcpy(&y, &i, sizeof(float)); + return y * (1.5f - (x * y * y)); + } + protected: std::vector< std::vector> historic_g_sqr_; float e; diff --git a/next/include/multiverso/updater/smooth_gradient_updater.h b/next/include/multiverso/updater/momentum_updater.h similarity index 67% rename from next/include/multiverso/updater/smooth_gradient_updater.h rename to next/include/multiverso/updater/momentum_updater.h index 71b2e42..68470c4 100644 --- a/next/include/multiverso/updater/smooth_gradient_updater.h +++ b/next/include/multiverso/updater/momentum_updater.h @@ -1,5 +1,5 @@ -#ifndef MULTIVERSO_UPDATER_SMOOTH_GRADIENT_UPDATER_H_ -#define MULTIVERSO_UPDATER_SMOOTH_GRADIENT_UPDATER_H_ +#ifndef MULTIVERSO_UPDATER_MOMENTUM_UPDATER_H_ +#define MULTIVERSO_UPDATER_MOMENTUM_UPDATER_H_ #include "updater.h" #include @@ -7,9 +7,9 @@ namespace multiverso { template -class SmoothGradientUpdater : public Updater { +class MomentumUpdater : public Updater { public: - explicit SmoothGradientUpdater(size_t size) : size_(size) { + explicit MomentumUpdater(size_t size) : size_(size) { Log::Debug("[SmoothGradientUpdater] Init with size = %d. \n", size_); smooth_gradient_.resize(size_); } @@ -22,7 +22,7 @@ public: data[index + offset] -= smooth_gradient_[index + offset]; } } - ~SmoothGradientUpdater() { smooth_gradient_.clear(); } + ~MomentumUpdater() { smooth_gradient_.clear(); } protected: std::vector smooth_gradient_; size_t size_; @@ -30,4 +30,4 @@ protected: } -#endif // MULTIVERSO_UPDATER_SMOOTH_GRADIENT_UPDATER_H_ \ No newline at end of file +#endif // MULTIVERSO_UPDATER_MOMENTUM_UPDATER_H_ \ No newline at end of file diff --git a/next/include/multiverso/updater/second_order_gradient_updater.h b/next/include/multiverso/updater/second_order_gradient_updater.h index 0b1786e..31b373c 100644 --- a/next/include/multiverso/updater/second_order_gradient_updater.h +++ b/next/include/multiverso/updater/second_order_gradient_updater.h @@ -1,58 +1,64 @@ #ifndef MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_ #define MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_ +#include +#include + #include "updater.h" -#include namespace multiverso { - // [TODO(qiwye)]:rename the class to Shuxin Zheng's algorithms - template - class SecondOrderUpdater : public Updater { - public: - explicit SecondOrderUpdater(size_t size) : - size_(size) { - Log::Debug("[SecondOrderUpdater] Init with size = %d. \n", size_); - shadow_copies_.resize(MV_NumWorkers()); - for (auto s : shadow_copies_){ - s.resize(size); - } - historic_g_sqr_.resize(MV_NumWorkers()); - for (auto s : historic_g_sqr_){ - s.resize(size); - } +// [TODO(qiwye)]:rename the class to Shuxin Zheng's algorithms +template +class SecondOrderUpdater : public Updater { +public: + explicit SecondOrderUpdater(size_t size) : + size_(size) { + Log::Debug("[SecondOrderUpdater] Init with size = %d. \n", size_); + shadow_copies_.resize(MV_NumWorkers(), std::vector(size_)); + historic_g_sqr_.resize(MV_NumWorkers(), std::vector(size_)); + } + void Update(size_t num_element, T*data, T*delta, + UpdateOption* option, size_t offset) override { + auto g_sqr_data_ = historic_g_sqr_.at(option->worker_id()); + auto copies_data_ = shadow_copies_.at(option->worker_id()); + for (size_t index = 0; index < num_element; ++index) { + // gsqr = (1 - r) * g^2 + r * gsqr + g_sqr_data_[index + offset] = + (1 - option->rho()) * delta[index] * delta[index] + / option->learning_rate() / option->learning_rate() + + option->rho() * g_sqr_data_[index + offset]; + + data[index + offset] -= delta[index] + option->lambda() * + std::sqrt(g_sqr_data_[index + offset]) * + (data[index + offset] - copies_data_[index + offset]); + + // caching each worker's latest version of parameter + copies_data_[index + offset] = data[index + offset]; } - void Update(size_t num_element, T*data, T*delta, - UpdateOption* option, size_t offset) override { - - for (size_t index = 0; index < num_element; ++index) { - // gsqr = (1 - r) * g^2 + r * gsqr - historic_g_sqr_[option->worker_id()][index + offset] = - (1 - option->rho()) * delta[index] * delta[index] - / option->learning_rate() / option->learning_rate() + - option->rho() * historic_g_sqr_[option->worker_id()][index + offset]; + } + ~SecondOrderUpdater() { + shadow_copies_.clear(); + historic_g_sqr_.clear(); + } - data[index + offset] -= delta[index] + option->lambda() * - std::sqrt(historic_g_sqr_[option->worker_id()][index + offset]) * - (data[index + offset] - shadow_copies_[option->worker_id()][index + offset]); +private: + float QuakeRsqrt(float number) { + float x = number * 0.5f, y = number; + std::uint32_t i; + std::memcpy(&i, &y, sizeof(float)); + i = 0x5f3759df - (i >> 1); + std::memcpy(&y, &i, sizeof(float)); + return y * (1.5f - (x * y * y)); + } - // caching each worker's latest version of parameter - shadow_copies_[option->worker_id()][index + offset] = data[index + offset]; - } - } - ~SecondOrderUpdater() { - shadow_copies_.clear(); - historic_g_sqr_.clear(); - } - protected: - std::vector< std::vector> shadow_copies_; - std::vector< std::vector> historic_g_sqr_; +protected: + std::vector< std::vector> shadow_copies_; + std::vector< std::vector> historic_g_sqr_; - // move these parameter to UpdateOption - size_t size_; - }; + size_t size_; +}; +} // namespace multiverso -} - -#endif // MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_ \ No newline at end of file +#endif // MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_ diff --git a/next/include/multiverso/updater/updater.h b/next/include/multiverso/updater/updater.h index 93eea3a..9ec54a9 100644 --- a/next/include/multiverso/updater/updater.h +++ b/next/include/multiverso/updater/updater.h @@ -2,14 +2,21 @@ #define MULTIVERSO_UPDATER_UPDATER_H_ #include +#include #include namespace multiverso { struct UpdateOption { public: - // TODO(feiga): default value; - UpdateOption() { data_[0].i = MV_WorkerId(); } + // TODO(qiwye): make these default value more flexiable + UpdateOption(){ + data_[0].i = MV_WorkerId(); + data_[1].f = 0.0f; + data_[2].f = 0.01f; + data_[3].f = 0.1f; + data_[4].f = 0.1f; + } UpdateOption(const char* data, size_t size) { CopyFrom(data, size); @@ -29,6 +36,15 @@ public: void set_lambda(float lambda) { data_[4].f = lambda; } + std::string toString(){ + std::stringstream ss; + ss << "UpdateOption " << worker_id() << " " << momentum() << " " + << learning_rate() << " " << rho() << " " << lambda() << std::endl; + + return ss.str(); + } + + const char* data() const { return reinterpret_cast(&data_[0]); } size_t size() const { return kSize * sizeof(InternalType); } void CopyFrom(const char* data, size_t size) { diff --git a/next/src/table/matrix_table.cpp b/next/src/table/matrix_table.cpp index 6941bea..e35a79e 100644 --- a/next/src/table/matrix_table.cpp +++ b/next/src/table/matrix_table.cpp @@ -138,7 +138,7 @@ int MatrixWorkerTable::Partition(const std::vector& kv, int rank = MV_ServerIdToRank(it.first); std::vector& vec = (*out)[rank]; vec.push_back(Blob(it.second * sizeof(int))); - if (kv.size() == 2) vec.push_back(Blob(it.second * row_size_)); + if (kv.size() >= 2) vec.push_back(Blob(it.second * row_size_)); } count.clear(); @@ -147,7 +147,7 @@ int MatrixWorkerTable::Partition(const std::vector& kv, int dst = dest[i]; int rank = MV_ServerIdToRank(dst); (*out)[rank][0].As(count[dst]) = keys[i]; - if (kv.size() == 2){ // copy add values + if (kv.size() >= 2){ // copy add values memcpy(&((*out)[rank][1].As(count[dst] * num_col_)), kv[1].data() + offset, row_size_); offset += row_size_; @@ -244,7 +244,9 @@ void MatrixServerTable::ProcessAdd(const std::vector& data) { server_id_, row_offset_, ssize / num_col_); } else { - CHECK(data[1].size() == keys_size * sizeof(T)* num_col_); + Log::Debug("[Debug] Server = %d, keys_size = %d, data[1].size = %d, num_col = %d\n", + server_id_, keys_size, data[1].size() , num_col_); + CHECK(data[1].size() == keys_size * num_col_); int offset_v = 0; CHECK(storage_.size() >= keys_size * num_col_); diff --git a/next/src/updater/updater.cpp b/next/src/updater/updater.cpp index 2665389..c9d3a00 100644 --- a/next/src/updater/updater.cpp +++ b/next/src/updater/updater.cpp @@ -1,7 +1,7 @@ #include "multiverso/updater/updater.h" #include "multiverso/updater/adagrad_updater.h" -#include "multiverso/updater/smooth_gradient_updater.h" +#include "multiverso/updater/momentum_updater.h" #include "multiverso/updater/second_order_gradient_updater.h" #include "multiverso/updater/sgd_updater.h" #include "multiverso/util/configure.h" @@ -32,11 +32,10 @@ Updater* Updater::GetUpdater(size_t) { template Updater* Updater::GetUpdater(size_t size) { std::string type = MV_CONFIG_updater_type; - printf(type.c_str()); if (type == "sgd") return new SGDUpdater(size); if (type == "adagrad") return new AdaGradUpdater(size); - if (type == "smooth_gradient") return new SmoothGradientUpdater(size); - if (type == "second_order") return new SecondOrderUpdater(size); + if (type == "momentum_sgd") return new MomentumUpdater(size); + if (type == "second_order_sgd") return new SecondOrderUpdater(size); // Default: simple updater return new Updater(); }