updating multiverso library to latest one

This commit is contained in:
Qiwei Ye 2016-02-25 16:03:16 +09:00
Родитель 42b3b7a214
Коммит 73b0ee90db
12 изменённых файлов: 749 добавлений и 39 удалений

Просмотреть файл

@ -23,38 +23,39 @@ public:
// Stop to run the Actor
void Stop();
// Accept a message from other actors
void Accept(MessagePtr&);
void Receive(MessagePtr&);
// Actor name, a unique identifier of a actor
const std::string name() const { return name_; }
// Message response function
using Task = std::function<void(MessagePtr&)>;
protected:
void RegisterTask(const MsgType& type, const Task& task) {
// Message response function
using Handler = std::function<void(MessagePtr&)>;
// Register message handler function
void RegisterHandler(const MsgType& type, const Handler& task) {
handlers_.insert({ type, task });
}
void DeliverTo(const std::string& dst_name, MessagePtr& msg);
// Send a message to a dst actor
void SendTo(const std::string& dst_name, MessagePtr& msg);
// Run in a background thread to receive msg from other actors and process
// messages based on registered handlers
// Main function run in a background thread
// The default main is to receive msg from other actors and process
// messages based on registered message handlers
virtual void Main();
std::string name_;
std::unique_ptr<std::thread> thread_;
// message queue
std::unique_ptr<MtQueue<std::unique_ptr<Message>> > mailbox_;
std::unordered_map<int, Task> handlers_;
std::unique_ptr<MtQueue<MessagePtr> > mailbox_;
// message handlers function
std::unordered_map<int, Handler> handlers_;
};
namespace actor {
const std::string kCommunicator = "communicator";
const std::string kController = "controller";
const std::string kServer = "server";
const std::string kWorker = "worker";
const std::string kCommunicator = "communicator";
const std::string kController = "controller";
const std::string kServer = "server";
const std::string kWorker = "worker";
}

Просмотреть файл

@ -11,12 +11,15 @@ enum Role {
kAll = 3
};
void MultiversoInit(int role = kAll);
void MultiversoInit(int* argc = nullptr,
char* argv[] = nullptr,
int role = kAll);
void MultiversoBarrier();
void MultiversoShutDown(bool finalize_mpi = true);
int MultiversoRank();
}
#endif // MULTIVERSO_INCLUDE_MULTIVERSO_H_

Просмотреть файл

@ -6,6 +6,12 @@
#include "multiverso/message.h"
namespace multiverso {
enum NetThreadLevel {
THREAD_SERIALIZED,
THREAD_MULTIPLE
};
// Interface of inter process communication method
class NetInterface {
public:
@ -17,8 +23,17 @@ public:
virtual int size() const = 0;
virtual int rank() const = 0;
virtual size_t Send(const MessagePtr& msg) = 0;
// \return 1. > 0 sended size
// 2. = 0 not sended
// 3. < 0 net error
virtual size_t Send(MessagePtr& msg) = 0;
// \return 1. > 0 received size
// 2. = 0 not received
// 3. < 0 net error
virtual size_t Recv(MessagePtr* msg) = 0;
virtual int thread_level_support() = 0;
};
}

Просмотреть файл

@ -0,0 +1,211 @@
#ifndef MULTIVERSO_NET_MPI_NET_H_
#define MULTIVERSO_NET_MPI_NET_H_
#ifdef MULTIVERSO_USE_MPI
#include "multiverso/net.h"
#include <limits>
#include <mutex>
#include <queue>
#include "multiverso/message.h"
#include "multiverso/util/log.h"
#include <mpi.h>
#ifdef _MSC_VER
#undef max
#endif
namespace multiverso {
class MPINetWrapper : public NetInterface {
public:
MPINetWrapper() : more_(std::numeric_limits<char>::max()) {}
class MPIMsgHandle {
public:
void add_handle(MPI_Request handle) {
handles_.push_back(handle);
}
void set_msg(MessagePtr& msg) { msg_ = std::move(msg); }
void set_size(size_t size) { size_ = size; }
size_t size() const { return size_; }
void Wait() {
CHECK_NOTNULL(msg_.get());
int count = static_cast<int>(handles_.size());
MPI_Status* status = new MPI_Status[count];
MPI_Waitall(count, handles_.data(), status);
delete[] status;
}
bool Test() {
CHECK_NOTNULL(msg_.get());
int count = static_cast<int>(handles_.size());
MPI_Status* status = new MPI_Status[count];
int flag;
MPI_Testall(count, handles_.data(), &flag, status);
delete[] status;
return flag;
}
private:
std::vector<MPI_Request> handles_;
MessagePtr msg_;
size_t size_;
};
void Init(int* argc, char** argv) override {
// MPI_Init(argc, &argv);
MPI_Initialized(&inited_);
if (!inited_) {
MPI_Init_thread(argc, &argv, MPI_THREAD_SERIALIZED, &thread_provided_);
}
MPI_Query_thread(&thread_provided_);
if (thread_provided_ < MPI_THREAD_SERIALIZED) {
Log::Fatal("At least MPI_THREAD_SERIALIZED supported is needed by multiverso.\n");
}
else if (thread_provided_ == MPI_THREAD_SERIALIZED) {
Log::Info("multiverso MPI-Net is initialized under MPI_THREAD_SERIALIZED mode.\n");
}
else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
Log::Debug("multiverso MPI-Net is initialized under MPI_THREAD_MULTIPLE mode.\n");
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
MPI_Comm_size(MPI_COMM_WORLD, &size_);
MPI_Barrier(MPI_COMM_WORLD);
Log::Debug("%s net util inited, rank = %d, size = %d\n",
name().c_str(), rank(), size());
}
void Finalize() override { MPI_Finalize(); }
int rank() const override { return rank_; }
int size() const override { return size_; }
std::string name() const override { return "MPI"; }
size_t Send(MessagePtr& msg) override {
while (!msg_handles_.empty()) {
MPIMsgHandle* prev = msg_handles_.front();
if (prev->Test()) {
delete prev;
prev = nullptr;
msg_handles_.pop();
} else {
break;
}
}
MPIMsgHandle* handle = new MPIMsgHandle();
size_t size = SendAsync(msg, handle);
handle->set_msg(msg);
handle->set_size(size);
msg_handles_.push(handle);
return size;
}
size_t Recv(MessagePtr* msg) override {
MPI_Status status;
int flag;
// non-blocking probe whether message comes
MPI_Iprobe(MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &flag, &status);
if (!flag) return 0;
return RecvMsg(msg);
}
int thread_level_support() override {
if (thread_provided_ == MPI_THREAD_MULTIPLE)
return NetThreadLevel::THREAD_MULTIPLE;
return NetThreadLevel::THREAD_SERIALIZED;
}
private:
//size_t SendMsg(const MessagePtr& msg) {
// size_t size = Message::kHeaderSize;
// MPI_Send(msg->header(), Message::kHeaderSize, MPI_BYTE,
// msg->dst(), 0, MPI_COMM_WORLD);
// // Send multiple msg
// for (auto& blob : msg->data()) {
// CHECK_NOTNULL(blob.data());
// MPI_Send(blob.data(), static_cast<int>(blob.size()), MPI_BYTE, msg->dst(),
// 0, MPI_COMM_WORLD);
// size += blob.size();
// }
// // Send an extra over tag indicating the finish of this Message
// MPI_Send(&more_, sizeof(char), MPI_BYTE, msg->dst(),
// 0, MPI_COMM_WORLD);
// // Log::Debug("MPI-Net: rank %d send msg size = %d\n", rank(), size+4);
// return size + sizeof(char);
//}
size_t SendAsync(const MessagePtr& msg,
MPIMsgHandle* msg_handle) {
size_t size = Message::kHeaderSize;
MPI_Request handle;
MPI_Isend(msg->header(), Message::kHeaderSize, MPI_BYTE,
msg->dst(), 0, MPI_COMM_WORLD, &handle);
msg_handle->add_handle(handle);
// Send multiple msg
for (auto& blob : msg->data()) {
CHECK_NOTNULL(blob.data());
MPI_Isend(blob.data(), static_cast<int>(blob.size()), MPI_BYTE, msg->dst(),
0, MPI_COMM_WORLD, &handle);
size += blob.size();
msg_handle->add_handle(handle);
}
// Send an extra over tag indicating the finish of this Message
MPI_Isend(&more_, sizeof(char), MPI_BYTE, msg->dst(),
0, MPI_COMM_WORLD, &handle);
// Log::Debug("MPI-Net: rank %d send msg size = %d\n", rank(), size+4);
msg_handle->add_handle(handle);
return size + sizeof(char);
}
size_t RecvMsg(MessagePtr* msg_ptr) {
if (!msg_ptr->get()) msg_ptr->reset(new Message());
MessagePtr& msg = *msg_ptr;
msg->data().clear();
MPI_Status status;
MPI_Recv(msg->header(), Message::kHeaderSize,
MPI_BYTE, MPI_ANY_SOURCE,
0, MPI_COMM_WORLD, &status);
size_t size = Message::kHeaderSize;
int i = 0;
int num_probe = 0;
while (true) {
int count;
CHECK(MPI_SUCCESS == MPI_Probe(msg->src(), 0, MPI_COMM_WORLD, &status));
MPI_Get_count(&status, MPI_BYTE, &count);
Blob blob(count);
// We only receive from msg->src() until we recv the overtag msg
MPI_Recv(blob.data(), count, MPI_BYTE, msg->src(),
0, MPI_COMM_WORLD, &status);
size += count;
if (count == sizeof(char)) {
if (blob.As<char>() == more_) break;
CHECK(false);
}
msg->Push(blob);
}
// Log::Debug("MPI-Net: rank %d end recv from src %d, size = %d\n", rank(), msg->src(), size);
return size;
}
private:
const char more_;
std::mutex mutex_;
int thread_provided_;
int inited_;
int rank_;
int size_;
std::queue<MPIMsgHandle *> msg_handles_;
};
}
#endif // MULTIVERSO_USE_MPI
#endif // MULTIVERSO_NET_MPI_NET_H_

Просмотреть файл

@ -0,0 +1,165 @@
#ifndef MULTIVERSO_NET_ZMQ_NET_H_
#define MULTIVERSO_NET_ZMQ_NET_H_
#ifdef MULTIVERSO_USE_ZMQ
#include "multiverso/net.h"
#include <limits>
#include "multiverso/message.h"
#include "multiverso/util/log.h"
#include "multiverso/util/net_util.h"
#include <zmq.h>
namespace multiverso {
class ZMQNetWrapper : public NetInterface {
public:
// argc >= 2
// argv[1]: machine file, format is same with MPI machine file
// argv[2]: port used
void Init(int* argc, char** argv) override {
// get machine file
CHECK(*argc > 2);
std::vector<std::string> machine_lists;
ParseMachineFile(argv[1], &machine_lists);
int port = atoi(argv[2]);
size_ = static_cast<int>(machine_lists.size());
CHECK(size_ > 0);
std::unordered_set<std::string> local_ip;
net::GetLocalIPAddress(&local_ip);
context_ = zmq_ctx_new();
zmq_ctx_set(context_, ZMQ_MAX_SOCKETS, 256);
for (auto ip : machine_lists) {
if (local_ip.find(ip) != local_ip.end()) { // my rank
rank_ = static_cast<int>(requester_.size());
requester_.push_back(nullptr);
responder_ = zmq_socket(context_, ZMQ_DEALER);
int rc = zmq_bind(responder_,
("tcp://" + ip + ":" + std::to_string(port)).c_str());
CHECK(rc == 0);
} else {
void* requester = zmq_socket(context_, ZMQ_DEALER);
int rc = zmq_connect(requester,
("tcp://" + ip + ":" + std::to_string(port)).c_str());
CHECK(rc == 0);
requester_.push_back(requester);
}
}
CHECK_NOTNULL(responder_);
Log::Info("%s net util inited, rank = %d, size = %d\n",
name().c_str(), rank(), size());
}
void Finalize() override {
zmq_close(responder_);
for (auto& p : requester_) if (p) zmq_close(p);
zmq_ctx_destroy(context_);
}
int rank() const override { return rank_; }
int size() const override { return size_; }
std::string name() const override { return "ZeroMQ"; }
size_t Send(MessagePtr& msg) override {
size_t size = 0;
int dst = msg->dst();
void* socket = requester_[dst];
CHECK_NOTNULL(socket);
int send_size;
send_size = zmq_send(socket, msg->header(),
Message::kHeaderSize, msg->data().size() > 0 ? ZMQ_SNDMORE : 0);
CHECK(Message::kHeaderSize == send_size);
size += send_size;
for (size_t i = 0; i < msg->data().size(); ++i) {
Blob blob = msg->data()[i];
size_t blob_size = blob.size();
CHECK_NOTNULL(blob.data());
send_size = zmq_send(socket, &blob_size, sizeof(size_t), ZMQ_SNDMORE);
CHECK(send_size == sizeof(size_t));
send_size = zmq_send(socket, blob.data(), static_cast<int>(blob.size()),
i == msg->data().size() - 1 ? 0 : ZMQ_SNDMORE);
CHECK(send_size == blob_size);
size += blob_size + sizeof(size_t);
}
return size;
}
size_t Recv(MessagePtr* msg_ptr) override {
if (!msg_ptr->get()) msg_ptr->reset(new Message());
size_t size = 0;
int recv_size;
size_t blob_size;
int more;
size_t more_size = sizeof(more);
// Receiving a Message from multiple zmq_recv
CHECK_NOTNULL(msg_ptr);
MessagePtr& msg = *msg_ptr;
msg->data().clear();
CHECK(msg.get());
recv_size = zmq_recv(responder_, msg->header(), Message::kHeaderSize, 0);
if (recv_size < 0) { return -1; }
CHECK(Message::kHeaderSize == recv_size);
size += recv_size;
zmq_getsockopt(responder_, ZMQ_RCVMORE, &more, &more_size);
while (more) {
recv_size = zmq_recv(responder_, &blob_size, sizeof(size_t), 0);
CHECK(recv_size == sizeof(size_t));
size += recv_size;
zmq_getsockopt(responder_, ZMQ_RCVMORE, &more, &more_size);
CHECK(more);
Blob blob(blob_size);
recv_size = zmq_recv(responder_, blob.data(), blob.size(), 0);
CHECK(recv_size == blob_size);
size += recv_size;
msg->Push(blob);
zmq_getsockopt(responder_, ZMQ_RCVMORE, &more, &more_size);
}
return size;
}
int thread_level_support() override {
return NetThreadLevel::THREAD_MULTIPLE;
}
private:
void ParseMachineFile(std::string filename,
std::vector<std::string>* result) {
CHECK_NOTNULL(result);
FILE* file;
char str[32];
int i = 0;
#ifdef _MSC_VER
fopen_s(&file, filename.c_str(), "r");
#else
file = fopen(filename.c_str(), "r");
#endif
CHECK_NOTNULL(file);
#ifdef _MSC_VER
while (fscanf_s(file, "%s", &str, 32) > 0) {
#else
while (fscanf(file, "%s", &str) > 0) {
#endif
result->push_back(str);
}
fclose(file);
}
void* context_;
void* responder_;
std::vector<void*> requester_;
int rank_;
int size_;
};
} // namespace multiverso
#endif // MULTIVERSO_USE_ZEROMQ
#endif // MULTIVERSO_NET_ZMQ_NET_H_

Просмотреть файл

@ -0,0 +1,153 @@
#ifndef MULTIVERSO_ADAM_ARRAY_TABLE_H_
#define MULTIVERSO_ADAM_ARRAY_TABLE_H_
#include "multiverso/table_interface.h"
#include "multiverso/util/log.h"
#include "multiverso/zoo.h"
namespace multiverso {
// this is a first-order gradient-based optimization of stochastic objective functions,
// based on adaptive estimates of lower-order moments
// by Diederik P. Kingma, Jimmy Lei Ba
template <typename T>
class AdamArrayWorker : public WorkerTable {
public:
explicit AdamArrayWorker(size_t size) : WorkerTable(), size_(size) {
num_server_ = Zoo::Get()->num_servers();
server_offsets_.push_back(0);
CHECK(size_ > Zoo::Get()->num_servers());
int length = static_cast<int>(size_) / Zoo::Get()->num_servers();
for (int i = 1; i < Zoo::Get()->num_servers(); ++i) {
server_offsets_.push_back(i * length);
}
server_offsets_.push_back(size_);
Log::Debug("worker %d create AdamArrayTable with %d elements.\n", Zoo::Get()->rank(), size);
}
T* raw() { return data_; }
// Get all element
// data is user-allocated memory
void Get(T* data, size_t size) {
CHECK(size == size_);
data_ = data;
int all_key = -1;
Blob whole_table(&all_key, sizeof(int));
WorkerTable::Get(whole_table);
Log::Debug("worker %d getting all parameters.\n", Zoo::Get()->rank());
}
// Add all element
void Add(T* data, size_t size, float smooth_momentum = 0.0) {
CHECK(size == size_);
int all_key = -1;
Blob key(&all_key, sizeof(int));
Blob val(data, sizeof(T) * size);
decay_momentum_rate_first_ = smooth_momentum;
WorkerTable::Add(key, val);
Log::Debug("worker %d adding parameters with size of %d.\n", Zoo::Get()->rank(), size);
}
int Partition(const std::vector<Blob>& kv,
std::unordered_map<int, std::vector<Blob> >* out) override {
CHECK(kv.size() == 1 || kv.size() == 2); // kv.size() == 1 : get msg;
// kv.size() == 2 : add msg;
for (int i = 0; i < num_server_; ++i)
{
(*out)[i].push_back(kv[0]);
}
if (kv.size() == 2)
{
CHECK(kv[1].size() == size_ * sizeof(T));
for (int i = 0; i < num_server_; ++i)
{
Blob blob(kv[1].data() + server_offsets_[i] * sizeof(T),
(server_offsets_[i + 1] - server_offsets_[i]) * sizeof(T));
(*out)[i].push_back(blob);
Blob momentum(&decay_momentum_rate_first_, sizeof(float)); // sending coefficent of smooth gradient to server
(*out)[i].push_back(momentum);
}
}
return num_server_;
}
void ProcessReplyGet(std::vector<Blob>& reply_data) override {
CHECK(reply_data.size() == 2);
int id = (reply_data[0]).As<int>();
CHECK(reply_data[1].size<T>() == (server_offsets_[id+1] - server_offsets_[id]));
// TODO(qiwye): is there a way to reduce this memcpy?
memcpy(data_ + server_offsets_[id], reply_data[1].data(), reply_data[1].size());
}
private:
T* data_; // not owned
size_t size_;
int num_server_;
float decay_momentum_rate_first_;
std::vector<size_t> server_offsets_;
};
// The storage is a continuous large chunk of memory
template <typename T>
class AdamArrayServer : public ServerTable {
public:
explicit AdamArrayServer(size_t size) : ServerTable() {
server_id_ = Zoo::Get()->rank();
size_ = size / Zoo::Get()->size();
if (server_id_ == Zoo::Get()->num_servers()-1) { // last server
size_ += size % Zoo::Get()->num_servers();
}
storage_.resize(size_);
smooth_gradient_first_.resize(size_);
stepsize_ = 0.001;
decay_momentum_rate_first_ = 0.9f;
decay_momentum_rate_first_ = 0.99f; // default setting in paper
Log::Debug("server %d create AdamArrayTable with %d elements of %d elements.\n", server_id_, size_ * 2, size * 2);
}
void ProcessAdd(const std::vector<Blob>& data) override {
#ifdef MULTIVERSO_USE_BLAS
// MKL update
Log::Fatal("Not implemented yet\n");
#else
Blob keys = data[0], values = data[1];
decay_momentum_rate_first_ = data[2].As<float>();
CHECK(keys.size<int>() == 1 && keys.As<int>() == -1); // Always request whole table
CHECK(values.size() == size_ * sizeof(T));
for (int i = 0; i < size_; ++i)
{
smooth_gradient_first_[i] = decay_momentum_rate_first_ * smooth_gradient_first_[i] + (1 - decay_momentum_rate_first_) * values.As<T>(i);
storage_[i] += smooth_gradient_first_[i];
}
#endif
}
void ProcessGet(const std::vector<Blob>& data,
std::vector<Blob>* result) override {
size_t key_size = data[0].size<int>();
CHECK(key_size == 1 && data[0].As<int>() == -1); // Always request the whole table
Blob key(sizeof(int)); key.As<int>() = server_id_;
Blob value(storage_.data(), sizeof(T) * size_);
result->push_back(key);
result->push_back(value);
}
private:
int server_id_;
float decay_momentum_rate_first_;
float decay_momentum_rate_second_;
float stepsize_;
std::vector<T> storage_;
std::vector<T> smooth_gradient_first_;
std::vector<T> smooth_gradient_first_;
size_t size_; // number of element with type T
};
}
#endif // MULTIVERSO_ADAM_ARRAY_TABLE_H_

Просмотреть файл

@ -1,5 +1,5 @@
#ifndef MULTIVERSO_STATIC_TABLE_H_
#define MULTIVERSO_STATIC_TABLE_H_
#ifndef MULTIVERSO_ARRAY_TABLE_H_
#define MULTIVERSO_ARRAY_TABLE_H_
#include "multiverso/table_interface.h"
#include "multiverso/util/log.h"
@ -128,4 +128,4 @@ private:
};
}
#endif // MULTIVERSO_STATIC_TABLE_H_
#endif // MULTIVERSO_ARRAY_TABLE_H_

Просмотреть файл

@ -0,0 +1,145 @@
#ifndef MULTIVERSO_SMOOTH_ARRAY_TABLE_H_
#define MULTIVERSO_SMOOTH_ARRAY_TABLE_H_
#include "multiverso/table_interface.h"
#include "multiverso/util/log.h"
#include "multiverso/zoo.h"
namespace multiverso {
// A distributed shared std::vector<T> table
template <typename T>
class SmoothArrayWorker : public WorkerTable {
public:
explicit SmoothArrayWorker(size_t size) : WorkerTable(), size_(size) {
num_server_ = Zoo::Get()->num_servers();
server_offsets_.push_back(0);
CHECK(size_ > Zoo::Get()->num_servers());
int length = static_cast<int>(size_) / Zoo::Get()->num_servers();
for (int i = 1; i < Zoo::Get()->num_servers(); ++i) {
server_offsets_.push_back(i * length);
}
server_offsets_.push_back(size_);
Log::Debug("worker %d create SmoothArrayTable with %d elements.\n", Zoo::Get()->rank(), size);
}
T* raw() { return data_; }
// Get all element
// data is user-allocated memory
void Get(T* data, size_t size) {
CHECK(size == size_);
data_ = data;
int all_key = -1;
Blob whole_table(&all_key, sizeof(int));
WorkerTable::Get(whole_table);
Log::Debug("worker %d getting all parameters.\n", Zoo::Get()->rank());
}
// Add all element
void Add(T* data, size_t size, float smooth_momentum = 0.0) {
CHECK(size == size_);
int all_key = -1;
Blob key(&all_key, sizeof(int));
Blob val(data, sizeof(T) * size);
smooth_momentum_ = smooth_momentum;
WorkerTable::Add(key, val);
Log::Debug("worker %d adding parameters with size of %d.\n", Zoo::Get()->rank(), size);
}
int Partition(const std::vector<Blob>& kv,
std::unordered_map<int, std::vector<Blob> >* out) override {
CHECK(kv.size() == 1 || kv.size() == 2); // kv.size() == 1 : get msg;
// kv.size() == 2 : add msg;
for (int i = 0; i < num_server_; ++i)
{
(*out)[i].push_back(kv[0]);
}
if (kv.size() == 2)
{
CHECK(kv[1].size() == size_ * sizeof(T));
for (int i = 0; i < num_server_; ++i)
{
Blob blob(kv[1].data() + server_offsets_[i] * sizeof(T),
(server_offsets_[i + 1] - server_offsets_[i]) * sizeof(T));
(*out)[i].push_back(blob);
Blob momentum(&smooth_momentum_, sizeof(float)); // sending coefficent of smooth gradient to server
(*out)[i].push_back(momentum);
}
}
return num_server_;
}
void ProcessReplyGet(std::vector<Blob>& reply_data) override {
CHECK(reply_data.size() == 2);
int id = (reply_data[0]).As<int>();
CHECK(reply_data[1].size<T>() == (server_offsets_[id + 1] - server_offsets_[id]));
// TODO(qiwye): is there a way to reduce this memcpy?
memcpy(data_ + server_offsets_[id], reply_data[1].data(), reply_data[1].size());
}
private:
T* data_; // not owned
size_t size_;
int num_server_;
float smooth_momentum_;
std::vector<size_t> server_offsets_;
};
// The storage is a continuous large chunk of memory
template <typename T>
class SmoothArrayServer : public ServerTable {
public:
explicit SmoothArrayServer(size_t size) : ServerTable() {
server_id_ = Zoo::Get()->rank();
size_ = size / Zoo::Get()->size();
if (server_id_ == Zoo::Get()->num_servers() - 1) { // last server
size_ += size % Zoo::Get()->num_servers();
}
storage_.resize(size_);
smooth_gradient_.resize(size_);
smooth_momentum_ = 0.0f;
Log::Debug("server %d create SmoothArrayTable with %d elements of %d elements.\n", server_id_, size_ * 2, size * 2);
}
void ProcessAdd(const std::vector<Blob>& data) override {
#ifdef MULTIVERSO_USE_BLAS
// MKL update
Log::Fatal("Not implemented yet\n");
#else
Blob keys = data[0], values = data[1];
smooth_momentum_ = data[2].As<float>();
CHECK(keys.size<int>() == 1 && keys.As<int>() == -1); // Always request whole table
CHECK(values.size() == size_ * sizeof(T));
for (int i = 0; i < size_; ++i)
{
smooth_gradient_[i] = smooth_momentum_ * smooth_gradient_[i] + (1 - smooth_momentum_) * values.As<T>(i);
storage_[i] += smooth_gradient_[i];
}
#endif
}
void ProcessGet(const std::vector<Blob>& data,
std::vector<Blob>* result) override {
size_t key_size = data[0].size<int>();
CHECK(key_size == 1 && data[0].As<int>() == -1); // Always request the whole table
Blob key(sizeof(int)); key.As<int>() = server_id_;
Blob value(storage_.data(), sizeof(T) * size_);
result->push_back(key);
result->push_back(value);
}
private:
int server_id_;
float smooth_momentum_;
std::vector<T> storage_;
std::vector<T> smooth_gradient_;
size_t size_; // number of element with type T
};
}
#endif // MULTIVERSO_SMOOTH_ARRAY_TABLE_H_

Просмотреть файл

@ -3,6 +3,7 @@
#ifndef MULTIVERSO_MT_QUEUE_H_
#define MULTIVERSO_MT_QUEUE_H_
#include <atomic>
#include <queue>
#include <mutex>
#include <condition_variable>
@ -16,7 +17,7 @@ template<typename T>
class MtQueue {
public:
/*! \brief Constructor */
MtQueue() : exit_(false) {}
MtQueue() { exit_.store(false); }
/*!
* \brief Push an element into the queue. the function is based on
@ -60,13 +61,16 @@ public:
/*! \brief Exit queue, awake all threads blocked by the queue */
void Exit();
bool Alive();
private:
/*! the underlying container of queue */
std::queue<T> buffer_;
mutable std::mutex mutex_;
std::condition_variable empty_condition_;
/*! whether the queue is still work */
bool exit_;
std::atomic_bool exit_;
// bool exit_;
// No copying allowed
MtQueue(const MtQueue&);
@ -126,9 +130,14 @@ bool MtQueue<T>::Empty() const {
template<typename T>
void MtQueue<T>::Exit() {
std::unique_lock<std::mutex> lock(mutex_);
exit_ = true;
exit_.store(true);
empty_condition_.notify_all();
}
template<typename T>
bool MtQueue<T>::Alive() {
return exit_ == false;
}
}
#endif // MULTIVERSO_MT_QUEUE_H_

Просмотреть файл

@ -2,22 +2,30 @@
#define MULTIVERSO_UTIL_NET_UTIL_H_
#include <string>
#include <unordered_set>
namespace multiverso {
namespace net {
std::string GetHostName() {
return "";
}
//std::string GetHostName() {
// return "";
//}
//
//std::string HostNameToIP(std::string hostname) {
// return "";
//}
//
//std::string IPToHostName(std::string ip) {
// return "";
//}
//
//bool IsLocalAddress(std::string ip) {
// return true;
//}
std::string HostNameToIP(std::string hostname) {
return "";
}
void GetLocalIPAddress(std::unordered_set<std::string>* result);
std::string IPToHostName(std::string ip) {
return "";
}
}
} // namespace net
} // namespace multiverso
}
#endif // MULTIVERSO_UTIL_NET_UTIL_H_

Просмотреть файл

@ -23,14 +23,14 @@ public:
static Zoo* Get() { static Zoo zoo; return &zoo; };
// Start all actors
void Start(int role);
void Start(int* argc, char** argv, int role);
// Stop all actors
void Stop(bool finalize_net);
void Barrier();
void Deliver(const std::string& name, MessagePtr&);
void Accept(MessagePtr& msg);
void SendTo(const std::string& name, MessagePtr&);
void Receive(MessagePtr& msg);
int rank() const;
int size() const;

Двоичные данные
Source/Multiverso/x64/release/libmultiverso.a

Двоичный файл не отображается.