From e8d02c42586856bb2e3ff2656c7f767ab1674d6d Mon Sep 17 00:00:00 2001 From: Qiwei Ye Date: Fri, 19 Feb 2016 16:07:49 +0800 Subject: [PATCH] adding array table with smooth gradient --- next/IMultiverso.vcxproj | 58 +------ next/IMultiverso.vcxproj.filters | 3 + next/Test/Test.vcxproj | 63 -------- next/Test/main.cpp | 37 +++++ next/include/multiverso/table/array_table.h | 6 +- .../multiverso/table/smooth_array_table.h | 145 ++++++++++++++++++ next/src/mpi_net.cpp | 6 +- 7 files changed, 192 insertions(+), 126 deletions(-) create mode 100644 next/include/multiverso/table/smooth_array_table.h diff --git a/next/IMultiverso.vcxproj b/next/IMultiverso.vcxproj index 9c90b40..b4cb34a 100644 --- a/next/IMultiverso.vcxproj +++ b/next/IMultiverso.vcxproj @@ -1,18 +1,10 @@  - - debug - Win32 - debug x64 - - release - Win32 - release x64 @@ -28,25 +20,12 @@ SAK - - StaticLibrary - true - v120 - Unicode - StaticLibrary true v120 Unicode - - StaticLibrary - false - v120 - true - Unicode - StaticLibrary false @@ -57,15 +36,9 @@ - - - - - - @@ -76,19 +49,6 @@ $(ThirdPartyPath)\ZeroMQ 4.0.4\include;$(MSMPI_INC);$(ProjectDir)\include;$(VC_IncludePath);$(WindowsSDK_IncludePath); - - - - - Level3 - Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - - - Windows - true - - @@ -102,23 +62,6 @@ true - - - Level3 - - - MaxSpeed - true - true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - - - Windows - true - true - true - - Level3 @@ -155,6 +98,7 @@ + diff --git a/next/IMultiverso.vcxproj.filters b/next/IMultiverso.vcxproj.filters index 2711ec5..20983dc 100644 --- a/next/IMultiverso.vcxproj.filters +++ b/next/IMultiverso.vcxproj.filters @@ -61,6 +61,9 @@ system + + include\table + diff --git a/next/Test/Test.vcxproj b/next/Test/Test.vcxproj index 2a107a6..4fc8c1a 100644 --- a/next/Test/Test.vcxproj +++ b/next/Test/Test.vcxproj @@ -1,18 +1,10 @@  - - Debug - Win32 - Debug x64 - - Release - Win32 - Release x64 @@ -28,25 +20,12 @@ Test - - Application - true - v120 - Unicode - Application true v120 Unicode - - Application - false - v120 - true - Unicode - Application false @@ -57,48 +36,23 @@ - - - - - - - - true - true $(SolutionDir)/source;$(SolutionDir)/include;$(VC_IncludePath);$(WindowsSDK_IncludePath); $(SolutionDir)/x64/$(Configuration);$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64) - - false - false $(ThirdPartyPath)\ZeroMQ 4.0.4\include;$(SolutionDir)/src;$(SolutionDir)/include;$(VC_IncludePath);$(WindowsSDK_IncludePath); $(SolutionDir)/x64/$(Configuration);$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64) - - - - - Level3 - Disabled - WIN32;_DEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions) - - - Console - true - - @@ -112,23 +66,6 @@ true - - - Level3 - - - MaxSpeed - true - true - WIN32;NDEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions) - - - Console - true - true - true - - Level3 diff --git a/next/Test/main.cpp b/next/Test/main.cpp index 3d43435..8b081ba 100644 --- a/next/Test/main.cpp +++ b/next/Test/main.cpp @@ -5,6 +5,7 @@ #include #include +#include #include using namespace multiverso; @@ -98,6 +99,41 @@ void TestArray() { MultiversoShutDown(); } +void TestMomArray() { + Log::Info("Test smooth_gradient table \n"); + + MultiversoInit(); + + SmoothArrayWorker* shared_array = new SmoothArrayWorker(10); + SmoothArrayServer* server_array = new SmoothArrayServer(10); + + MultiversoBarrier(); + Log::Info("Create tables OK\n"); + + for (int i = 0; i < 10; ++i) { + // std::vector& vec = shared_array->raw(); + + // shared_array->Get(); + float data[10]; + + std::vector delta(10); + for (int i = 1; i <= 10; ++i) + delta[i] = static_cast(i); + + shared_array->Add(delta.data(), 10, 0.5f); + + Log::Info("Rank %d Add OK\n", MultiversoRank()); + + shared_array->Get(data, 10); + Log::Info("Rank %d Get OK\n", MultiversoRank()); + for (int i = 0; i < 10; ++i) + std::cout << data[i] << " "; std::cout << std::endl; + MultiversoBarrier(); + + } + MultiversoShutDown(); +} + void TestNet() { NetInterface* net = NetInterface::Get(); @@ -127,6 +163,7 @@ int main(int argc, char* argv[]) { if (strcmp(argv[1], "kv") == 0) TestKV(); else if (strcmp(argv[1], "array") == 0) TestArray(); else if (strcmp(argv[1], "net") == 0) TestNet(); + else if (strcmp(argv[1], "mom") == 0) TestMomArray(); else CHECK(false); } else { TestArray(); diff --git a/next/include/multiverso/table/array_table.h b/next/include/multiverso/table/array_table.h index cf283c6..9cc7962 100644 --- a/next/include/multiverso/table/array_table.h +++ b/next/include/multiverso/table/array_table.h @@ -1,5 +1,5 @@ -#ifndef MULTIVERSO_STATIC_TABLE_H_ -#define MULTIVERSO_STATIC_TABLE_H_ +#ifndef MULTIVERSO_ARRAY_TABLE_H_ +#define MULTIVERSO_ARRAY_TABLE_H_ #include "multiverso/table_interface.h" #include "multiverso/util/log.h" @@ -128,4 +128,4 @@ private: }; } -#endif // MULTIVERSO_STATIC_TABLE_H_ +#endif // MULTIVERSO_ARRAY_TABLE_H_ diff --git a/next/include/multiverso/table/smooth_array_table.h b/next/include/multiverso/table/smooth_array_table.h new file mode 100644 index 0000000..d2b9cfd --- /dev/null +++ b/next/include/multiverso/table/smooth_array_table.h @@ -0,0 +1,145 @@ +#ifndef MULTIVERSO_SMOOTH_ARRAY_TABLE_H_ +#define MULTIVERSO_SMOOTH_ARRAY_TABLE_H_ + +#include "multiverso/table_interface.h" +#include "multiverso/util/log.h" +#include "multiverso/zoo.h" + +namespace multiverso { + +// A distributed shared std::vector table + +template +class SmoothArrayWorker : public WorkerTable { +public: + explicit SmoothArrayWorker(size_t size) : WorkerTable(), size_(size) { + num_server_ = Zoo::Get()->num_servers(); + server_offsets_.push_back(0); + CHECK(size_ > Zoo::Get()->num_servers()); + int length = static_cast(size_) / Zoo::Get()->num_servers(); + for (int i = 1; i < Zoo::Get()->num_servers(); ++i) { + server_offsets_.push_back(i * length); + } + server_offsets_.push_back(size_); + Log::Debug("worker %d create SmoothArrayTable with %d elements.\n", Zoo::Get()->rank(), size); + } + + T* raw() { return data_; } + + // Get all element + // data is user-allocated memory + void Get(T* data, size_t size) { + CHECK(size == size_); + data_ = data; + int all_key = -1; + Blob whole_table(&all_key, sizeof(int)); + WorkerTable::Get(whole_table); + Log::Debug("worker %d getting all parameters.\n", Zoo::Get()->rank()); + } + + // Add all element + void Add(T* data, size_t size, float smooth_momentum = 0.0) { + CHECK(size == size_); + int all_key = -1; + + Blob key(&all_key, sizeof(int)); + Blob val(data, sizeof(T) * size); + smooth_momentum_ = smooth_momentum; + WorkerTable::Add(key, val); + Log::Debug("worker %d adding parameters with size of %d.\n", Zoo::Get()->rank(), size); + } + + int Partition(const std::vector& kv, + std::unordered_map >* out) override { + CHECK(kv.size() == 1 || kv.size() == 2); // kv.size() == 1 : get msg; + // kv.size() == 2 : add msg; + for (int i = 0; i < num_server_; ++i) + { + (*out)[i].push_back(kv[0]); + } + + if (kv.size() == 2) + { + CHECK(kv[1].size() == size_ * sizeof(T)); + for (int i = 0; i < num_server_; ++i) + { + Blob blob(kv[1].data() + server_offsets_[i] * sizeof(T), + (server_offsets_[i + 1] - server_offsets_[i]) * sizeof(T)); + (*out)[i].push_back(blob); + Blob momentum(&smooth_momentum_, sizeof(float)); // sending coefficent of smooth gradient to server + (*out)[i].push_back(momentum); + } + } + return num_server_; + } + + void ProcessReplyGet(std::vector& reply_data) override { + CHECK(reply_data.size() == 2); + int id = (reply_data[0]).As(); + CHECK(reply_data[1].size() == (server_offsets_[id+1] - server_offsets_[id])); + + // TODO(qiwye): is there a way to reduce this memcpy? + memcpy(data_ + server_offsets_[id], reply_data[1].data(), reply_data[1].size()); + } + +private: + T* data_; // not owned + size_t size_; + int num_server_; + float smooth_momentum_; + std::vector server_offsets_; +}; + +// The storage is a continuous large chunk of memory +template +class SmoothArrayServer : public ServerTable { +public: + explicit SmoothArrayServer(size_t size) : ServerTable() { + server_id_ = Zoo::Get()->rank(); + size_ = size / Zoo::Get()->size(); + if (server_id_ == Zoo::Get()->num_servers()-1) { // last server + size_ += size % Zoo::Get()->num_servers(); + } + storage_.resize(size_); + smooth_gradient_.resize(size_); + smooth_momentum_ = 0.0f; + Log::Debug("server %d create SmoothArrayTable with %d elements of %d elements.\n", server_id_, size_ * 2, size * 2); + } + + void ProcessAdd(const std::vector& data) override { +#ifdef MULTIVERSO_USE_BLAS + // MKL update + Log::Fatal("Not implemented yet\n"); +#else + Blob keys = data[0], values = data[1]; + smooth_momentum_ = data[2].As(); + CHECK(keys.size() == 1 && keys.As() == -1); // Always request whole table + CHECK(values.size() == size_ * sizeof(T)); + for (int i = 0; i < size_; ++i) + { + smooth_gradient_[i] = smooth_momentum_ * smooth_gradient_[i] + (1 - smooth_momentum_) * values.As(i); + storage_[i] += smooth_gradient_[i]; + } +#endif + } + + void ProcessGet(const std::vector& data, + std::vector* result) override { + size_t key_size = data[0].size(); + CHECK(key_size == 1 && data[0].As() == -1); // Always request the whole table + Blob key(sizeof(int)); key.As() = server_id_; + Blob value(storage_.data(), sizeof(T) * size_); + result->push_back(key); + result->push_back(value); + } + +private: + int server_id_; + float smooth_momentum_; + std::vector storage_; + std::vector smooth_gradient_; + size_t size_; // number of element with type T +}; +} + +#endif // MULTIVERSO_SMOOTH_ARRAY_TABLE_H_ diff --git a/next/src/mpi_net.cpp b/next/src/mpi_net.cpp index e6e81b4..ac2e4b7 100644 --- a/next/src/mpi_net.cpp +++ b/next/src/mpi_net.cpp @@ -55,7 +55,7 @@ public: size_t Send(const MessagePtr& msg) override { if (thread_provided_ == MPI_THREAD_SERIALIZED) { - std::lock_guard lock(mutex_); + //std::lock_guard lock(mutex_); return SendMsg(msg); } else if (thread_provided_ == MPI_THREAD_MULTIPLE) { return SendMsg(msg); @@ -74,7 +74,7 @@ public: if (thread_provided_ == MPI_THREAD_SERIALIZED) { // a message come // block receive with lock guard - std::lock_guard lock(mutex_); + //std::lock_guard lock(mutex_); return RecvMsg(msg); } else if (thread_provided_ == MPI_THREAD_MULTIPLE) { return RecvMsg(msg); @@ -114,7 +114,7 @@ public: 0, MPI_COMM_WORLD, &status); size_t size = Message::kHeaderSize; int i = 0; - int flag; + //int flag; int num_probe = 0; while (true) { int count;