fix, remove atomic thread from header file, add matrix table option

This commit is contained in:
feiga 2016-05-19 20:58:17 +08:00
Родитель f1e7fd21d2
Коммит b55e90b051
15 изменённых файлов: 154 добавлений и 93 удалений

Просмотреть файл

@ -208,8 +208,12 @@ void TestMatrix(int argc, char* argv[]){
int num_row = 8, num_col = 3592;
int size = num_row * num_col;
MatrixWorkerTable<int>* worker_table = new MatrixWorkerTable<int>(num_row, num_col);
MatrixServerTable<int>* server_table = new MatrixServerTable<int>(num_row, num_col);
// MatrixWorkerTable<int>* worker_table = new MatrixWorkerTable<int>(num_row, num_col);
// MatrixServerTable<int>* server_table = new MatrixServerTable<int>(num_row, num_col);
MatrixTableOption<int> option;
option.num_row = num_row;
option.num_col = num_col;
MatrixWorkerTable<int>* worker_table = multiverso::MV_CreateTable(option);
std::thread* m_prefetchThread = nullptr;
MV_Barrier();
int count = 0;
@ -258,6 +262,7 @@ void TestMatrix(int argc, char* argv[]){
MV_Barrier();
}
delete worker_table;
MV_ShutDown();
}

Просмотреть файл

@ -1,15 +1,15 @@
#ifndef MULTIVERSO_ACTOR_H_
#define MULTIVERSO_ACTOR_H_
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include "multiverso/message.h"
namespace std { class thread; }
namespace multiverso {
template<typename T> class MtQueue;
@ -47,7 +47,6 @@ protected:
std::unique_ptr<MtQueue<MessagePtr> > mailbox_;
// message handlers function
std::unordered_map<int, Handler> handlers_;
// std::atomic_bool is_working_;
bool is_working_;
private:
std::string name_;

Просмотреть файл

@ -7,80 +7,57 @@
#include <iostream>
#include "multiverso/util/log.h"
#include "multiverso/util/allocator.h"
namespace multiverso {
// Manage a chunk of memory. Blob can share memory with other Blobs.
// Never use external memory. All external memory should be managed by itself
// TODO(feiga): maybe make blob also not hold memory?
class Blob {
public:
// an empty blob
Blob() : data_(nullptr) {}
// Manage a chunk of memory. Blob can share memory with other Blobs.
// Never use external memory. All external memory should be managed by itself
// TODO(feiga): maybe make blob also not hold memory?
class Blob {
public:
// an empty blob
Blob() : data_(nullptr) {}
explicit Blob(size_t size) : size_(size) {
CHECK(size > 0);
data_ = Allocator::Get()->Alloc(size);
//data_.reset(new char[size]);
}
explicit Blob(size_t size);
// Construct from external memory. Will copy a new piece
Blob(const void* data, size_t size) : size_(size) {
data_ = Allocator::Get()->Alloc(size);
memcpy(data_, data, size_);
}
// Construct from external memory. Will copy a new piece
Blob(const void* data, size_t size);
Blob(void* data, size_t size) : size_(size) {
data_ = Allocator::Get()->Alloc(size);
memcpy(data_, data, size_);
}
Blob(void* data, size_t size);
Blob(const Blob& rhs) {
Allocator::Get()->Refer(rhs.data_);
this->data_ = rhs.data_;
this->size_ = rhs.size_;
}
Blob(const Blob& rhs);
~Blob() {
if (data_ != nullptr) {
Allocator::Get()->Free(data_);
}
}
~Blob();
// Shallow copy by default. Call \ref CopyFrom for a deep copy
void operator=(const Blob& rhs) {
Allocator::Get()->Refer(rhs.data_);
this->data_ = rhs.data_;
this->size_ = rhs.size_;
}
// Shallow copy by default. Call \ref CopyFrom for a deep copy
void operator=(const Blob& rhs);
inline char operator[](size_t i) const {
CHECK(0 <= i && i < size_);
return data_[i];
}
inline char operator[](size_t i) const {
CHECK(0 <= i && i < size_);
return data_[i];
}
template <typename T>
inline T& As(size_t i = 0) const {
CHECK(size_ % sizeof(T) == 0 && i < size_ / sizeof(T));
return (reinterpret_cast<T*>(data_))[i];
}
template <typename T>
inline size_t size() const { return size_ / sizeof(T); }
template <typename T>
inline T& As(size_t i = 0) const {
CHECK(size_ % sizeof(T) == 0 && i < size_ / sizeof(T));
return (reinterpret_cast<T*>(data_))[i];
}
template <typename T>
inline size_t size() const { return size_ / sizeof(T); }
// DeepCopy, for a shallow copy, use operator=
void CopyFrom(const Blob& src);
// DeepCopy, for a shallow copy, use operator=
void CopyFrom(const Blob& src);
inline char* data() const { return data_; }
inline size_t size() const { return size_; }
inline char* data() const { return data_; }
inline size_t size() const { return size_; }
private:
private:
// Memory is shared and auto managed
//std::shared_ptr<char> data_;
char *data_;
size_t size_;
};
// Memory is shared and auto managed
//std::shared_ptr<char> data_;
char *data_;
size_t size_;
};
} // namespace multiverso

Просмотреть файл

@ -9,10 +9,16 @@
namespace multiverso {
template <typename T>
struct MatrixTableOption;
template <typename T>
class MatrixWorkerTable : public WorkerTable {
public:
explicit MatrixWorkerTable(const MatrixTableOption<T>& option);
MatrixWorkerTable(integer_t num_row, integer_t num_col);
~MatrixWorkerTable();
// get whole table, data is user-allocated memory
@ -55,6 +61,8 @@ class Updater;
template <typename T>
class MatrixServerTable : public ServerTable {
public:
explicit MatrixServerTable(const MatrixTableOption<T>& option);
MatrixServerTable(integer_t num_row, integer_t num_col);
void ProcessAdd(const std::vector<Blob>& data) override;
@ -74,6 +82,13 @@ protected:
std::vector<T> storage_;
};
template <typename T>
struct MatrixTableOption {
integer_t num_row;
integer_t num_col;
DEFINE_TABLE_TYPE(T, MatrixWorkerTable, MatrixServerTable);
};
}
#endif // MULTIVERSO_MATRIX_TABLE_H_

Просмотреть файл

@ -1,7 +1,6 @@
#ifndef MULTIVERSO_TABLE_INTERFACE_H_
#define MULTIVERSO_TABLE_INTERFACE_H_
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
@ -10,6 +9,8 @@
#include "multiverso/blob.h"
#include "multiverso/io/io.h"
namespace std { class mutex; }
namespace multiverso {
typedef int32_t integer_t;
@ -22,7 +23,7 @@ struct GetOption;
class WorkerTable {
public:
WorkerTable();
virtual ~WorkerTable() = default;
virtual ~WorkerTable();
void Get(Blob keys, const GetOption* option = nullptr);
void Add(Blob keys, Blob values, const AddOption* option = nullptr);
@ -46,7 +47,7 @@ private:
std::string table_name_;
// assuming there are at most 2^32 tables
int table_id_;
std::mutex m_;
std::mutex* m_;
std::vector<Waiter*> waitings_;
int msg_id_;
};

Просмотреть файл

@ -1,10 +1,11 @@
#ifndef MULTIVERSO_ALLOCATOR_H_
#define MULTIVERSO_ALLOCATOR_H_
#include <mutex>
#include <atomic>
#include <unordered_map>
namespace std { class mutex; }
namespace multiverso {
const size_t g_pointer_size = sizeof(void*);
@ -19,7 +20,7 @@ public:
private:
MemoryBlock* free_ = nullptr;
size_t size_;
std::mutex mutex_;
std::mutex* mutex_;
};
class MemoryBlock {
@ -49,13 +50,14 @@ private:
class SmartAllocator : public Allocator {
public:
SmartAllocator();
~SmartAllocator();
char* Alloc(size_t size);
void Free(char* data);
void Refer(char *data);
~SmartAllocator();
private:
std::unordered_map<size_t, FreeList*> pools_;
std::mutex mutex_;
std::mutex* mutex_;
};
} // namespace multiverso

Просмотреть файл

@ -1,7 +1,6 @@
#ifndef MULTIVERSO_ZOO_H_
#define MULTIVERSO_ZOO_H_
#include <atomic>
#include <string>
#include <vector>
#include <unordered_map>

Просмотреть файл

@ -211,6 +211,7 @@
</ItemGroup>
<ItemGroup>
<ClCompile Include="actor.cpp" />
<ClCompile Include="blob.cpp" />
<ClCompile Include="communicator.cpp" />
<ClCompile Include="controller.cpp" />
<ClCompile Include="dashboard.cpp" />

Просмотреть файл

@ -221,5 +221,8 @@
<ClCompile Include="table_factory.cpp">
<Filter>system</Filter>
</ClCompile>
<ClCompile Include="blob.cpp">
<Filter>system</Filter>
</ClCompile>
</ItemGroup>
</Project>

Просмотреть файл

@ -2,6 +2,7 @@
#include <chrono>
#include <string>
#include <thread>
#include "multiverso/message.h"
#include "multiverso/util/log.h"

42
src/blob.cpp Normal file
Просмотреть файл

@ -0,0 +1,42 @@
#include "multiverso/blob.h"
#include "multiverso/util/allocator.h"
namespace multiverso {
Blob::Blob(size_t size) : size_(size) {
CHECK(size > 0);
data_ = Allocator::Get()->Alloc(size);
//data_.reset(new char[size]);
}
// Construct from external memory. Will copy a new piece
Blob::Blob(const void* data, size_t size) : size_(size) {
data_ = Allocator::Get()->Alloc(size);
memcpy(data_, data, size_);
}
Blob::Blob(void* data, size_t size) : size_(size) {
data_ = Allocator::Get()->Alloc(size);
memcpy(data_, data, size_);
}
Blob::Blob(const Blob& rhs) {
Allocator::Get()->Refer(rhs.data_);
this->data_ = rhs.data_;
this->size_ = rhs.size_;
}
Blob::~Blob() {
if (data_ != nullptr) {
Allocator::Get()->Free(data_);
}
}
// Shallow copy by default. Call \ref CopyFrom for a deep copy
void Blob::operator=(const Blob& rhs) {
Allocator::Get()->Refer(rhs.data_);
this->data_ = rhs.data_;
this->size_ = rhs.size_;
}
}

Просмотреть файл

@ -1,6 +1,7 @@
#include "multiverso/communicator.h"
#include <memory>
#include <thread>
#include "multiverso/zoo.h"
#include "multiverso/net.h"
@ -10,7 +11,6 @@
namespace multiverso {
namespace message {
// TODO(feiga): refator the ugly statement
bool to_server(MsgType type) {
return (static_cast<int>(type)) > 0 &&
(static_cast<int>(type)) < 32;
@ -24,7 +24,6 @@ bool to_worker(MsgType type) {
bool to_controler(MsgType type) {
return (static_cast<int>(type)) > 32;
}
} // namespace message
Communicator::Communicator() : Actor(actor::kCommunicator) {

Просмотреть файл

@ -12,9 +12,14 @@ namespace multiverso {
WorkerTable::WorkerTable() {
msg_id_ = 0;
m_ = new std::mutex();
table_id_ = Zoo::Get()->RegisterTable(this);
}
WorkerTable::~WorkerTable() {
delete m_;
}
ServerTable::ServerTable() {
Zoo::Get()->RegisterTable(this);
}
@ -36,10 +41,10 @@ void WorkerTable::Add(Blob keys, Blob values,
int WorkerTable::GetAsync(Blob keys,
const GetOption* option) {
m_.lock();
m_->lock();
int id = msg_id_++;
waitings_.push_back(new Waiter());
m_.unlock();
m_->unlock();
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
msg->set_type(MsgType::Request_Get);
@ -57,10 +62,10 @@ int WorkerTable::GetAsync(Blob keys,
int WorkerTable::AddAsync(Blob keys, Blob values,
const AddOption* option) {
m_.lock();
m_->lock();
int id = msg_id_++;
waitings_.push_back(new Waiter());
m_.unlock();
m_->unlock();
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
msg->set_type(MsgType::Request_Add);
@ -79,31 +84,31 @@ int WorkerTable::AddAsync(Blob keys, Blob values,
void WorkerTable::Wait(int id) {
// CHECK(waitings_.find(id) != waitings_.end());
m_.lock();
m_->lock();
CHECK(waitings_[id] != nullptr);
Waiter* w = waitings_[id];
m_.unlock();
m_->unlock();
w->Wait();
m_.lock();
m_->lock();
delete waitings_[id];
waitings_[id] = nullptr;
m_.unlock();
m_->unlock();
}
void WorkerTable::Reset(int msg_id, int num_wait) {
m_.lock();
m_->lock();
CHECK_NOTNULL(waitings_[msg_id]);
waitings_[msg_id]->Reset(num_wait);
m_.unlock();
m_->unlock();
}
void WorkerTable::Notify(int id) {
m_.lock();
m_->lock();
CHECK_NOTNULL(waitings_[id]);
waitings_[id]->Notify();
m_.unlock();
m_->unlock();
}
} // namespace multiverso

Просмотреть файл

@ -9,6 +9,10 @@
namespace multiverso {
template <typename T>
MatrixWorkerTable<T>::MatrixWorkerTable(const MatrixTableOption<T>& option) :
MatrixWorkerTable(option.num_row, option.num_col) {}
template <typename T>
MatrixWorkerTable<T>::MatrixWorkerTable(integer_t num_row, integer_t num_col) :
WorkerTable(), num_row_(num_row), num_col_(num_col) {
@ -226,8 +230,9 @@ void MatrixWorkerTable<T>::ProcessReplyGet(std::vector<Blob>& reply_data) {
if (--get_reply_count_ == 0) { }
}
template <typename T>
MatrixServerTable<T>::MatrixServerTable(const MatrixTableOption<T>& option) :
MatrixServerTable(option.num_row, option.num_col) {}
template <typename T>
MatrixServerTable<T>::MatrixServerTable(integer_t num_row, integer_t num_col) :

Просмотреть файл

@ -1,5 +1,7 @@
#include "multiverso/util/allocator.h"
#include <mutex>
#include "multiverso/util/log.h"
#include "multiverso/util/configure.h"
@ -27,8 +29,8 @@ inline void AlignFree(char *data) {
#endif
}
inline FreeList::FreeList(size_t size) :
size_(size) {
inline FreeList::FreeList(size_t size) : size_(size) {
mutex_ = new std::mutex();
free_ = new MemoryBlock(size, this);
}
@ -42,7 +44,7 @@ FreeList::~FreeList() {
}
inline char* FreeList::Pop() {
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(*mutex_);
if (free_ == nullptr) {
free_ = new MemoryBlock(size_, this);
}
@ -52,7 +54,7 @@ inline char* FreeList::Pop() {
}
inline void FreeList::Push(MemoryBlock*block) {
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(*mutex_);
block->next = free_;
free_ = block;
}
@ -98,7 +100,7 @@ char* SmartAllocator::Alloc(size_t size) {
size += 1;
}
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(*mutex_);
if (pools_[size] == nullptr) {
pools_[size] = new FreeList(size);
}
@ -115,8 +117,13 @@ void SmartAllocator::Refer(char *data) {
(*(MemoryBlock**)(data - g_pointer_size))->Link();
}
SmartAllocator::SmartAllocator() {
mutex_ = new std::mutex();
}
SmartAllocator::~SmartAllocator() {
Log::Debug("~SmartAllocator, final pool size: %d\n", pools_.size());
delete mutex_;
for (auto i : pools_) {
delete i.second;
}