diff --git a/Test/main.cpp b/Test/main.cpp index eb45e87..ba91e3f 100644 --- a/Test/main.cpp +++ b/Test/main.cpp @@ -208,8 +208,12 @@ void TestMatrix(int argc, char* argv[]){ int num_row = 8, num_col = 3592; int size = num_row * num_col; - MatrixWorkerTable* worker_table = new MatrixWorkerTable(num_row, num_col); - MatrixServerTable* server_table = new MatrixServerTable(num_row, num_col); + // MatrixWorkerTable* worker_table = new MatrixWorkerTable(num_row, num_col); + // MatrixServerTable* server_table = new MatrixServerTable(num_row, num_col); + MatrixTableOption option; + option.num_row = num_row; + option.num_col = num_col; + MatrixWorkerTable* worker_table = multiverso::MV_CreateTable(option); std::thread* m_prefetchThread = nullptr; MV_Barrier(); int count = 0; @@ -258,6 +262,7 @@ void TestMatrix(int argc, char* argv[]){ MV_Barrier(); } + delete worker_table; MV_ShutDown(); } diff --git a/include/multiverso/actor.h b/include/multiverso/actor.h index 1318a9b..531eed0 100644 --- a/include/multiverso/actor.h +++ b/include/multiverso/actor.h @@ -1,15 +1,15 @@ #ifndef MULTIVERSO_ACTOR_H_ #define MULTIVERSO_ACTOR_H_ -#include #include #include #include -#include #include #include "multiverso/message.h" +namespace std { class thread; } + namespace multiverso { template class MtQueue; @@ -47,7 +47,6 @@ protected: std::unique_ptr > mailbox_; // message handlers function std::unordered_map handlers_; - // std::atomic_bool is_working_; bool is_working_; private: std::string name_; diff --git a/include/multiverso/blob.h b/include/multiverso/blob.h index 29f138f..21cd8e3 100644 --- a/include/multiverso/blob.h +++ b/include/multiverso/blob.h @@ -7,80 +7,57 @@ #include #include "multiverso/util/log.h" -#include "multiverso/util/allocator.h" namespace multiverso { - // Manage a chunk of memory. Blob can share memory with other Blobs. - // Never use external memory. All external memory should be managed by itself - // TODO(feiga): maybe make blob also not hold memory? - class Blob { - public: - // an empty blob - Blob() : data_(nullptr) {} +// Manage a chunk of memory. Blob can share memory with other Blobs. +// Never use external memory. All external memory should be managed by itself +// TODO(feiga): maybe make blob also not hold memory? +class Blob { +public: + // an empty blob + Blob() : data_(nullptr) {} - explicit Blob(size_t size) : size_(size) { - CHECK(size > 0); - data_ = Allocator::Get()->Alloc(size); - //data_.reset(new char[size]); - } + explicit Blob(size_t size); - // Construct from external memory. Will copy a new piece - Blob(const void* data, size_t size) : size_(size) { - data_ = Allocator::Get()->Alloc(size); - memcpy(data_, data, size_); - } + // Construct from external memory. Will copy a new piece + Blob(const void* data, size_t size); - Blob(void* data, size_t size) : size_(size) { - data_ = Allocator::Get()->Alloc(size); - memcpy(data_, data, size_); - } + Blob(void* data, size_t size); - Blob(const Blob& rhs) { - Allocator::Get()->Refer(rhs.data_); - this->data_ = rhs.data_; - this->size_ = rhs.size_; - } + Blob(const Blob& rhs); - ~Blob() { - if (data_ != nullptr) { - Allocator::Get()->Free(data_); - } - } + ~Blob(); - // Shallow copy by default. Call \ref CopyFrom for a deep copy - void operator=(const Blob& rhs) { - Allocator::Get()->Refer(rhs.data_); - this->data_ = rhs.data_; - this->size_ = rhs.size_; - } + // Shallow copy by default. Call \ref CopyFrom for a deep copy + void operator=(const Blob& rhs); - inline char operator[](size_t i) const { - CHECK(0 <= i && i < size_); - return data_[i]; - } + inline char operator[](size_t i) const { + CHECK(0 <= i && i < size_); + return data_[i]; + } - template - inline T& As(size_t i = 0) const { - CHECK(size_ % sizeof(T) == 0 && i < size_ / sizeof(T)); - return (reinterpret_cast(data_))[i]; - } - template - inline size_t size() const { return size_ / sizeof(T); } + template + inline T& As(size_t i = 0) const { + CHECK(size_ % sizeof(T) == 0 && i < size_ / sizeof(T)); + return (reinterpret_cast(data_))[i]; + } + template + inline size_t size() const { return size_ / sizeof(T); } - // DeepCopy, for a shallow copy, use operator= - void CopyFrom(const Blob& src); + // DeepCopy, for a shallow copy, use operator= + void CopyFrom(const Blob& src); - inline char* data() const { return data_; } - inline size_t size() const { return size_; } + inline char* data() const { return data_; } + inline size_t size() const { return size_; } - private: +private: - // Memory is shared and auto managed - //std::shared_ptr data_; - char *data_; - size_t size_; - }; + // Memory is shared and auto managed + //std::shared_ptr data_; + char *data_; + size_t size_; +}; } // namespace multiverso diff --git a/include/multiverso/table/matrix_table.h b/include/multiverso/table/matrix_table.h index 89c12ca..2b8d9bb 100644 --- a/include/multiverso/table/matrix_table.h +++ b/include/multiverso/table/matrix_table.h @@ -9,10 +9,16 @@ namespace multiverso { +template +struct MatrixTableOption; + template class MatrixWorkerTable : public WorkerTable { public: + explicit MatrixWorkerTable(const MatrixTableOption& option); + MatrixWorkerTable(integer_t num_row, integer_t num_col); + ~MatrixWorkerTable(); // get whole table, data is user-allocated memory @@ -55,6 +61,8 @@ class Updater; template class MatrixServerTable : public ServerTable { public: + explicit MatrixServerTable(const MatrixTableOption& option); + MatrixServerTable(integer_t num_row, integer_t num_col); void ProcessAdd(const std::vector& data) override; @@ -74,6 +82,13 @@ protected: std::vector storage_; }; +template +struct MatrixTableOption { + integer_t num_row; + integer_t num_col; + DEFINE_TABLE_TYPE(T, MatrixWorkerTable, MatrixServerTable); +}; + } #endif // MULTIVERSO_MATRIX_TABLE_H_ diff --git a/include/multiverso/table_interface.h b/include/multiverso/table_interface.h index d53c6d6..e4ed373 100644 --- a/include/multiverso/table_interface.h +++ b/include/multiverso/table_interface.h @@ -1,7 +1,6 @@ #ifndef MULTIVERSO_TABLE_INTERFACE_H_ #define MULTIVERSO_TABLE_INTERFACE_H_ -#include #include #include #include @@ -10,6 +9,8 @@ #include "multiverso/blob.h" #include "multiverso/io/io.h" +namespace std { class mutex; } + namespace multiverso { typedef int32_t integer_t; @@ -22,7 +23,7 @@ struct GetOption; class WorkerTable { public: WorkerTable(); - virtual ~WorkerTable() = default; + virtual ~WorkerTable(); void Get(Blob keys, const GetOption* option = nullptr); void Add(Blob keys, Blob values, const AddOption* option = nullptr); @@ -46,7 +47,7 @@ private: std::string table_name_; // assuming there are at most 2^32 tables int table_id_; - std::mutex m_; + std::mutex* m_; std::vector waitings_; int msg_id_; }; diff --git a/include/multiverso/util/allocator.h b/include/multiverso/util/allocator.h index afaa4fa..0b4e5f6 100644 --- a/include/multiverso/util/allocator.h +++ b/include/multiverso/util/allocator.h @@ -1,10 +1,11 @@ #ifndef MULTIVERSO_ALLOCATOR_H_ #define MULTIVERSO_ALLOCATOR_H_ -#include #include #include +namespace std { class mutex; } + namespace multiverso { const size_t g_pointer_size = sizeof(void*); @@ -19,7 +20,7 @@ public: private: MemoryBlock* free_ = nullptr; size_t size_; - std::mutex mutex_; + std::mutex* mutex_; }; class MemoryBlock { @@ -49,13 +50,14 @@ private: class SmartAllocator : public Allocator { public: + SmartAllocator(); + ~SmartAllocator(); char* Alloc(size_t size); void Free(char* data); void Refer(char *data); - ~SmartAllocator(); private: std::unordered_map pools_; - std::mutex mutex_; + std::mutex* mutex_; }; } // namespace multiverso diff --git a/include/multiverso/zoo.h b/include/multiverso/zoo.h index 4aa4a38..dfe2aa2 100644 --- a/include/multiverso/zoo.h +++ b/include/multiverso/zoo.h @@ -1,7 +1,6 @@ #ifndef MULTIVERSO_ZOO_H_ #define MULTIVERSO_ZOO_H_ -#include #include #include #include diff --git a/src/Multiverso.vcxproj b/src/Multiverso.vcxproj index 8396fc7..cde8af2 100644 --- a/src/Multiverso.vcxproj +++ b/src/Multiverso.vcxproj @@ -211,6 +211,7 @@ + diff --git a/src/Multiverso.vcxproj.filters b/src/Multiverso.vcxproj.filters index 755089c..91e0a44 100644 --- a/src/Multiverso.vcxproj.filters +++ b/src/Multiverso.vcxproj.filters @@ -221,5 +221,8 @@ system + + system + \ No newline at end of file diff --git a/src/actor.cpp b/src/actor.cpp index 4fe6dae..6335eb5 100644 --- a/src/actor.cpp +++ b/src/actor.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "multiverso/message.h" #include "multiverso/util/log.h" diff --git a/src/blob.cpp b/src/blob.cpp new file mode 100644 index 0000000..11f1ede --- /dev/null +++ b/src/blob.cpp @@ -0,0 +1,42 @@ +#include "multiverso/blob.h" + +#include "multiverso/util/allocator.h" + +namespace multiverso { + +Blob::Blob(size_t size) : size_(size) { + CHECK(size > 0); + data_ = Allocator::Get()->Alloc(size); + //data_.reset(new char[size]); +} + +// Construct from external memory. Will copy a new piece +Blob::Blob(const void* data, size_t size) : size_(size) { + data_ = Allocator::Get()->Alloc(size); + memcpy(data_, data, size_); +} + +Blob::Blob(void* data, size_t size) : size_(size) { + data_ = Allocator::Get()->Alloc(size); + memcpy(data_, data, size_); +} + +Blob::Blob(const Blob& rhs) { + Allocator::Get()->Refer(rhs.data_); + this->data_ = rhs.data_; + this->size_ = rhs.size_; +} + +Blob::~Blob() { + if (data_ != nullptr) { + Allocator::Get()->Free(data_); + } +} + +// Shallow copy by default. Call \ref CopyFrom for a deep copy +void Blob::operator=(const Blob& rhs) { + Allocator::Get()->Refer(rhs.data_); + this->data_ = rhs.data_; + this->size_ = rhs.size_; +} +} \ No newline at end of file diff --git a/src/communicator.cpp b/src/communicator.cpp index 058ff6a..bf03656 100644 --- a/src/communicator.cpp +++ b/src/communicator.cpp @@ -1,6 +1,7 @@ #include "multiverso/communicator.h" #include +#include #include "multiverso/zoo.h" #include "multiverso/net.h" @@ -10,7 +11,6 @@ namespace multiverso { namespace message { -// TODO(feiga): refator the ugly statement bool to_server(MsgType type) { return (static_cast(type)) > 0 && (static_cast(type)) < 32; @@ -24,7 +24,6 @@ bool to_worker(MsgType type) { bool to_controler(MsgType type) { return (static_cast(type)) > 32; } - } // namespace message Communicator::Communicator() : Actor(actor::kCommunicator) { diff --git a/src/table.cpp b/src/table.cpp index d5939d2..bcbfb71 100644 --- a/src/table.cpp +++ b/src/table.cpp @@ -12,9 +12,14 @@ namespace multiverso { WorkerTable::WorkerTable() { msg_id_ = 0; + m_ = new std::mutex(); table_id_ = Zoo::Get()->RegisterTable(this); } +WorkerTable::~WorkerTable() { + delete m_; +} + ServerTable::ServerTable() { Zoo::Get()->RegisterTable(this); } @@ -36,10 +41,10 @@ void WorkerTable::Add(Blob keys, Blob values, int WorkerTable::GetAsync(Blob keys, const GetOption* option) { - m_.lock(); + m_->lock(); int id = msg_id_++; waitings_.push_back(new Waiter()); - m_.unlock(); + m_->unlock(); MessagePtr msg(new Message()); msg->set_src(Zoo::Get()->rank()); msg->set_type(MsgType::Request_Get); @@ -57,10 +62,10 @@ int WorkerTable::GetAsync(Blob keys, int WorkerTable::AddAsync(Blob keys, Blob values, const AddOption* option) { - m_.lock(); + m_->lock(); int id = msg_id_++; waitings_.push_back(new Waiter()); - m_.unlock(); + m_->unlock(); MessagePtr msg(new Message()); msg->set_src(Zoo::Get()->rank()); msg->set_type(MsgType::Request_Add); @@ -79,31 +84,31 @@ int WorkerTable::AddAsync(Blob keys, Blob values, void WorkerTable::Wait(int id) { // CHECK(waitings_.find(id) != waitings_.end()); - m_.lock(); + m_->lock(); CHECK(waitings_[id] != nullptr); Waiter* w = waitings_[id]; - m_.unlock(); + m_->unlock(); w->Wait(); - m_.lock(); + m_->lock(); delete waitings_[id]; waitings_[id] = nullptr; - m_.unlock(); + m_->unlock(); } void WorkerTable::Reset(int msg_id, int num_wait) { - m_.lock(); + m_->lock(); CHECK_NOTNULL(waitings_[msg_id]); waitings_[msg_id]->Reset(num_wait); - m_.unlock(); + m_->unlock(); } void WorkerTable::Notify(int id) { - m_.lock(); + m_->lock(); CHECK_NOTNULL(waitings_[id]); waitings_[id]->Notify(); - m_.unlock(); + m_->unlock(); } } // namespace multiverso diff --git a/src/table/matrix_table.cpp b/src/table/matrix_table.cpp index b66f799..5846f58 100644 --- a/src/table/matrix_table.cpp +++ b/src/table/matrix_table.cpp @@ -9,6 +9,10 @@ namespace multiverso { +template +MatrixWorkerTable::MatrixWorkerTable(const MatrixTableOption& option) : +MatrixWorkerTable(option.num_row, option.num_col) {} + template MatrixWorkerTable::MatrixWorkerTable(integer_t num_row, integer_t num_col) : WorkerTable(), num_row_(num_row), num_col_(num_col) { @@ -226,8 +230,9 @@ void MatrixWorkerTable::ProcessReplyGet(std::vector& reply_data) { if (--get_reply_count_ == 0) { } } - - +template +MatrixServerTable::MatrixServerTable(const MatrixTableOption& option) : +MatrixServerTable(option.num_row, option.num_col) {} template MatrixServerTable::MatrixServerTable(integer_t num_row, integer_t num_col) : diff --git a/src/util/allocator.cpp b/src/util/allocator.cpp index 2ee1c41..a8226a4 100644 --- a/src/util/allocator.cpp +++ b/src/util/allocator.cpp @@ -1,5 +1,7 @@ #include "multiverso/util/allocator.h" +#include + #include "multiverso/util/log.h" #include "multiverso/util/configure.h" @@ -27,8 +29,8 @@ inline void AlignFree(char *data) { #endif } -inline FreeList::FreeList(size_t size) : -size_(size) { +inline FreeList::FreeList(size_t size) : size_(size) { + mutex_ = new std::mutex(); free_ = new MemoryBlock(size, this); } @@ -42,7 +44,7 @@ FreeList::~FreeList() { } inline char* FreeList::Pop() { - std::lock_guard lock(mutex_); + std::lock_guard lock(*mutex_); if (free_ == nullptr) { free_ = new MemoryBlock(size_, this); } @@ -52,7 +54,7 @@ inline char* FreeList::Pop() { } inline void FreeList::Push(MemoryBlock*block) { - std::lock_guard lock(mutex_); + std::lock_guard lock(*mutex_); block->next = free_; free_ = block; } @@ -98,7 +100,7 @@ char* SmartAllocator::Alloc(size_t size) { size += 1; } - std::unique_lock lock(mutex_); + std::unique_lock lock(*mutex_); if (pools_[size] == nullptr) { pools_[size] = new FreeList(size); } @@ -115,8 +117,13 @@ void SmartAllocator::Refer(char *data) { (*(MemoryBlock**)(data - g_pointer_size))->Link(); } +SmartAllocator::SmartAllocator() { + mutex_ = new std::mutex(); +} + SmartAllocator::~SmartAllocator() { Log::Debug("~SmartAllocator, final pool size: %d\n", pools_.size()); + delete mutex_; for (auto i : pools_) { delete i.second; }