Add dashboard to monitor the system excuation information for profile

This commit is contained in:
feiga 2016-03-24 14:35:42 +08:00
Родитель e9efbb26cc
Коммит 195317b6a7
18 изменённых файлов: 335 добавлений и 54 удалений

Просмотреть файл

@ -171,6 +171,7 @@
<ClInclude Include="include\multiverso\blob.h" />
<ClInclude Include="include\multiverso\communicator.h" />
<ClInclude Include="include\multiverso\controller.h" />
<ClInclude Include="include\multiverso\dashboard.h" />
<ClInclude Include="include\multiverso\message.h" />
<ClInclude Include="include\multiverso\multiverso.h" />
<ClInclude Include="include\multiverso\net.h" />
@ -193,16 +194,19 @@
<ClInclude Include="include\multiverso\util\waiter.h" />
<ClInclude Include="include\multiverso\worker.h" />
<ClInclude Include="include\multiverso\zoo.h" />
<ClInclude Include="table_factory.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\actor.cpp" />
<ClCompile Include="src\communicator.cpp" />
<ClCompile Include="src\controller.cpp" />
<ClCompile Include="src\dashboard.cpp" />
<ClCompile Include="src\multiverso.cpp" />
<ClCompile Include="src\net.cpp" />
<ClCompile Include="src\node.cpp" />
<ClCompile Include="src\server.cpp" />
<ClCompile Include="src\table.cpp" />
<ClCompile Include="src\timer.cpp" />
<ClCompile Include="src\util\log.cpp" />
<ClCompile Include="src\util\net_util.cpp" />
<ClCompile Include="src\worker.cpp" />

Просмотреть файл

@ -79,6 +79,12 @@
<ClInclude Include="include\multiverso\util\timer.h">
<Filter>util</Filter>
</ClInclude>
<ClInclude Include="table_factory.h">
<Filter>include</Filter>
</ClInclude>
<ClInclude Include="include\multiverso\dashboard.h">
<Filter>system</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="include">
@ -134,5 +140,11 @@
<ClCompile Include="src\net.cpp">
<Filter>system</Filter>
</ClCompile>
<ClCompile Include="src\timer.cpp">
<Filter>util</Filter>
</ClCompile>
<ClCompile Include="src\dashboard.cpp">
<Filter>system</Filter>
</ClCompile>
</ItemGroup>
</Project>

Просмотреть файл

@ -83,6 +83,9 @@
<OptimizeReferences>true</OptimizeReferences>
<AdditionalDependencies>IMultiverso.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
<ProjectReference>
<UseLibraryDependencyInputs>false</UseLibraryDependencyInputs>
</ProjectReference>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="main.cpp" />

Просмотреть файл

@ -40,7 +40,6 @@ void TestKV(int argc, char* argv[]) {
KVWorkerTable<int, int>* dht = new KVWorkerTable<int, int>();
// if the node is server, then create a server storage table
KVServerTable<int, int>* server_dht = new KVServerTable<int, int>();
Log::Info("table name %s", server_dht->name().c_str());
MV_Barrier();

Просмотреть файл

@ -0,0 +1,81 @@
#ifndef MULTIVERSO_INCLUDE_DASHBOARD_H_
#define MULTIVERSO_INCLUDE_DASHBOARD_H_
#include <map>
#include <mutex>
#include <string>
#include "multiverso/util/timer.h"
namespace multiverso {
class Monitor;
// Dashboard to record and query system running information
// thread safe
class Dashboard {
public:
static void AddMonitor(const std::string& name, Monitor* monitor);
static std::string Watch(const std::string& name);
static void Display();
private:
static std::map<std::string, Monitor*> record_;
static std::mutex m_;
};
class Monitor {
public:
explicit Monitor(const std::string& name) {
name_ = name;
timer_.Start();
Dashboard::AddMonitor(name_, this);
}
void Begin() { timer_.Start(); }
void End() {
elapse_ += timer_.elapse();
++count_;
}
double average() const { return elapse_ / count_; };
std::string name() const { return name_; }
double elapse() const { return elapse_; }
int count() const { return count_; }
std::string info_string() const;
private:
// name of the Monitor
std::string name_;
// total elapsed time
double elapse_;
// count of monitor
int count_;
// a timer util
Timer timer_;
};
// NOTE(feiga): user shouldn't call this, used in MONITOR_BEGIN as a local var
// note that static variable under different context may differs
// please use either only in global scope or only in local scope
#define REGISTER_MONITOR(name) \
static Monitor g_##name##_monitor(#name);
// Guard with MONITOR macro in the code to monitor it's excuation
// Usage:
// MONITOR_BEGIN(your_code)
// your code
// MONITOR_END(your_code)
#define MONITOR_BEGIN(name) \
REGISTER_MONITOR(name) \
g_##name##_monitor.Begin();
#define MONITOR_END(name) \
g_##name##_monitor.End();
}
#endif // MULTIVERSO_INCLUDE_DASHBOARD_H_

Просмотреть файл

@ -3,16 +3,9 @@
namespace multiverso {
enum Role {
Null = 0,
Worker = 1,
Server = 2,
All = 3
};
void MV_Init(int* argc = nullptr,
char* argv[] = nullptr,
int role = All);
int role = 3);
void MV_Barrier();

Просмотреть файл

@ -10,6 +10,7 @@
#include <queue>
#include "multiverso/message.h"
#include "multiverso/dashboard.h"
#include "multiverso/util/log.h"
#include "multiverso/util/mt_queue.h"
@ -198,6 +199,7 @@ public:
size_t SerializeAndSend(MessagePtr& msg, MPIMsgHandle* msg_handle) {
CHECK_NOTNULL(msg_handle);
MONITOR_BEGIN(MPI_NET_SEND_SERIALIZE);
size_t size = sizeof(int) + Message::kHeaderSize;
for (auto& data : msg->data()) size += sizeof(int) + data.size();
if (size > send_size_) {
@ -207,17 +209,18 @@ public:
memcpy(send_buffer_, msg->header(), Message::kHeaderSize);
char* p = send_buffer_ + Message::kHeaderSize;
for (auto& data : msg->data()) {
int s = data.size();
memcpy(p, &s, sizeof(int));
p += sizeof(int);
size_t s = data.size();
memcpy(p, &s, sizeof(size_t));
p += sizeof(size_t);
memcpy(p, data.data(), s);
p += s;
}
int over = -1;
memcpy(p, &over, sizeof(int));
MONITOR_END(MPI_NET_SEND_SERIALIZE);
MPI_Request handle;
MV_MPI_CALL(MPI_Isend(send_buffer_, size, MPI_BYTE, msg->dst(), 0, MPI_COMM_WORLD, &handle));
MV_MPI_CALL(MPI_Isend(send_buffer_, static_cast<int>(size), MPI_BYTE, msg->dst(), 0, MPI_COMM_WORLD, &handle));
msg_handle->add_handle(handle);
return size;
}
@ -229,6 +232,8 @@ public:
MPI_Status status;
MV_MPI_CALL(MPI_Recv(recv_buffer_, count,
MPI_BYTE, src, 0, MPI_COMM_WORLD, &status));
MONITOR_BEGIN(MPI_NET_RECV_DESERIALIZE)
char* p = recv_buffer_;
int s;
memcpy(msg->header(), p, Message::kHeaderSize);
@ -243,6 +248,7 @@ public:
memcpy(&s, p, sizeof(int));
p += sizeof(int);
}
MONITOR_END(MPI_NET_RECV_DESERIALIZE)
return count;
}
@ -317,9 +323,9 @@ private:
std::unique_ptr<MPIMsgHandle> last_handle_;
MtQueue<MessagePtr> send_queue_;
char* send_buffer_;
int send_size_;
size_t send_size_;
char* recv_buffer_;
int recv_size_;
size_t recv_size_;
};
}

Просмотреть файл

@ -3,6 +3,11 @@
namespace multiverso {
enum Role {
WORKER = 1,
SERVER = 2
};
struct Node {
int rank;
// role can be 0, 1, 2, 3

Просмотреть файл

@ -16,10 +16,9 @@ public:
WorkerTable();
virtual ~WorkerTable() {}
void Get(Blob keys) { Wait(GetAsync(keys)); }
void Add(Blob keys, Blob values) { Wait(AddAsync(keys, values)); }
void Get(Blob keys);
void Add(Blob keys, Blob values);
// TODO(feiga): add call back
int GetAsync(Blob keys);
int AddAsync(Blob keys, Blob values);
@ -34,8 +33,6 @@ public:
virtual void ProcessReplyGet(std::vector<Blob>&) = 0;
const std::string name() { return std::string(typeid(this).name());};
// add user defined data structure
private:
std::string table_name_;
@ -44,6 +41,16 @@ private:
int msg_id_;
};
// TODO(feiga): move to a seperate file
class Stream;
// interface for checkpoint table
class Serializable {
public:
virtual void Store(Stream* s) = 0;
virtual void Load(Stream* s) = 0;
};
// discribe the server parameter storage data structure and related method
class ServerTable {
public:
@ -52,35 +59,6 @@ public:
virtual void ProcessAdd(const std::vector<Blob>& data) = 0;
virtual void ProcessGet(const std::vector<Blob>& data,
std::vector<Blob>* result) = 0;
const std::string name() const { return std::string(typeid(this).name());};
// add user defined server process logic
void Process(const std::string instruction, const std::vector<Blob>& data, std::vector<Blob>* result = nullptr);
// add user defined server storage data structure
};
// TODO(feiga): provide better table creator method
// Abstract Factory to create server and worker
class TableFactory {
// static TableFactory* GetTableFactory();
virtual WorkerTable* CreateWorker() = 0;
virtual ServerTable* CreateServer() = 0;
static TableFactory* fatory_;
};
namespace table {
}
class TableBuilder {
public:
TableBuilder& SetArribute(const std::string& name, const std::string& val);
WorkerTable* WorkerTableBuild();
ServerTable* ServerTableBuild();
private:
std::unordered_map<std::string, std::string> params_;
};
}

Просмотреть файл

@ -2,7 +2,24 @@
#define MULTIVERSO_TIMER_H_
#include <chrono>
#include <ctime>
namespace multiverso {
class Timer {
public:
Timer();
// (Re)start the timer
void Start();
// Get elapsed milliseconds since last Timer::Start
double elapse();
private:
using TimePoint = std::chrono::system_clock::time_point;
TimePoint start_point_;
};
}
#endif //MULTIVERSO_TIMER_H_

42
next/src/dashboard.cpp Normal file
Просмотреть файл

@ -0,0 +1,42 @@
#include "multiverso/dashboard.h"
#include "multiverso/util/log.h"
#include <sstream>
namespace multiverso {
std::map<std::string, Monitor*> Dashboard::record_;
std::mutex Dashboard::m_;
void Dashboard::AddMonitor(const std::string& name, Monitor* monitor) {
std::lock_guard<std::mutex> l(m_);
CHECK(record_[name] == nullptr);
record_[name] = monitor;
Log::Info("Add monitor %s\n", name.c_str());
}
std::string Dashboard::Watch(const std::string& name) {
std::lock_guard<std::mutex> l(m_);
std::string result;
if (record_.find(name) == record_.end()) return result;
Monitor* monitor = record_[name];
CHECK_NOTNULL(monitor);
return monitor->info_string();
}
std::string Monitor::info_string() const {
std::ostringstream oss;
oss << "Monitor (" << name_ << ")] "
<< " count = " << count_
<< " elapse = " << elapse_ << "ms"
<< " average = " << average() << "ms";
return oss.str();
}
void Dashboard::Display() {
std::lock_guard<std::mutex> l(m_);
Log::Info("--------------Show dashboard monitor information--------------\n");
for (auto& it : record_) Log::Info("%s\n", it.second->info_string().c_str());
Log::Info("--------------------------------------------------------------\n");
}
}

Просмотреть файл

@ -5,8 +5,8 @@ namespace multiverso {
Node::Node() : rank(-1), role(-1), worker_id(-1), server_id(-1) {}
namespace node {
bool is_worker(int role) { return (role & 1) != 0; }
bool is_server(int role) { return (role & 2) != 0; }
bool is_worker(int role) { return (role & Role::WORKER) != 0; }
bool is_server(int role) { return (role & Role::SERVER) != 0; }
}

Просмотреть файл

@ -4,6 +4,8 @@
#include "multiverso/table_interface.h"
#include "multiverso/zoo.h"
#include "multiverso/dashboard.h"
namespace multiverso {
Server::Server() : Actor(actor::kServer) {
@ -20,19 +22,23 @@ int Server::RegisterTable(ServerTable* server_table) {
}
void Server::ProcessGet(MessagePtr& msg) {
MONITOR_BEGIN(SERVER_PROCESS_GET);
MessagePtr reply(msg->CreateReplyMessage());
int table_id = msg->table_id();
CHECK(table_id >= 0 && table_id < store_.size());
store_[table_id]->ProcessGet(msg->data(), &reply->data());
SendTo(actor::kCommunicator, reply);
MONITOR_END(SERVER_PROCESS_GET);
}
void Server::ProcessAdd(MessagePtr& msg) {
MONITOR_BEGIN(SERVER_PROCESS_ADD)
MessagePtr reply(msg->CreateReplyMessage());
int table_id = msg->table_id();
CHECK(table_id >= 0 && table_id < store_.size());
store_[table_id]->ProcessAdd(msg->data());
SendTo(actor::kCommunicator, reply);
MONITOR_END(SERVER_PROCESS_ADD)
}
}

Просмотреть файл

@ -1,8 +1,8 @@
#include "multiverso/table_interface.h"
#include "multiverso/util/log.h"
#include "multiverso/util/waiter.h"
#include "multiverso/zoo.h"
#include "multiverso/dashboard.h"
namespace multiverso {
@ -14,6 +14,18 @@ ServerTable::ServerTable() {
Zoo::Get()->RegisterTable(this);
}
void WorkerTable::Get(Blob keys) {
MONITOR_BEGIN(WORKER_TABLE_SYNC_GET)
Wait(GetAsync(keys));
MONITOR_END(WORKER_TABLE_SYNC_GET)
}
void WorkerTable::Add(Blob keys, Blob values) {
MONITOR_BEGIN(WORKER_TABLE_SYNC_ADD)
Wait(AddAsync(keys, values));
MONITOR_END(WORKER_TABLE_SYNC_ADD)
}
int WorkerTable::GetAsync(Blob keys) {
int id = msg_id_++;
waitings_[id] = new Waiter();

20
next/src/timer.cpp Normal file
Просмотреть файл

@ -0,0 +1,20 @@
#include "multiverso/util/timer.h"
namespace multiverso {
Timer::Timer() {
Start();
}
void Timer::Start() {
start_point_ = std::chrono::high_resolution_clock::now();
}
double Timer::elapse() {
TimePoint end_point = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> time_ms =
end_point - start_point_;
return time_ms.count();
}
}

Просмотреть файл

@ -1,4 +1,6 @@
#include "multiverso/worker.h"
#include "multiverso/dashboard.h"
#include "multiverso/util/mt_queue.h"
#include "multiverso/zoo.h"
@ -21,6 +23,7 @@ int Worker::RegisterTable(WorkerTable* worker_table) {
}
void Worker::ProcessGet(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_GET)
int table_id = msg->table_id();
int msg_id = msg->msg_id();
std::unordered_map<int, std::vector<Blob>> partitioned_key;
@ -36,9 +39,11 @@ void Worker::ProcessGet(MessagePtr& msg) {
msg->set_data(it.second);
SendTo(actor::kCommunicator, msg);
}
MONITOR_END(WORKER_PROCESS_GET)
}
void Worker::ProcessAdd(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_ADD)
int table_id = msg->table_id();
int msg_id = msg->msg_id();
std::unordered_map<int, std::vector<Blob>> partitioned_kv;
@ -47,7 +52,7 @@ void Worker::ProcessAdd(MessagePtr& msg) {
int num = cache_[table_id]->Partition(msg->data(), &partitioned_kv);
cache_[table_id]->Reset(msg_id, num);
for (auto& it : partitioned_kv) {
MessagePtr msg(new Message()); // = std::make_unique<Message>();
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
msg->set_dst(it.first);
msg->set_type(MsgType::Request_Add);
@ -56,12 +61,15 @@ void Worker::ProcessAdd(MessagePtr& msg) {
msg->set_data(it.second);
SendTo(actor::kCommunicator, msg);
}
MONITOR_END(WORKER_PROCESS_ADD)
}
void Worker::ProcessReplyGet(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_REPLY_GET)
int table_id = msg->table_id();
cache_[table_id]->ProcessReplyGet(msg->data());
cache_[table_id]->Notify(msg->msg_id());
MONITOR_END(WORKER_PROCESS_REPLY_GET)
}
void Worker::ProcessReplyAdd(MessagePtr& msg) {

Просмотреть файл

@ -8,6 +8,7 @@
#include "multiverso/worker.h"
#include "multiverso/server.h"
#include "multiverso/controller.h"
#include "multiverso/dashboard.h"
namespace multiverso {
@ -42,6 +43,8 @@ void Zoo::Stop(bool finalize_net) {
// Stop the system
Barrier();
Dashboard::Display();
// Stop all actors
for (auto actor : zoo_) { actor.second->Stop(); }
// Stop the network

92
next/table_factory.h Normal file
Просмотреть файл

@ -0,0 +1,92 @@
#ifndef MULTIVERSO_INCLUDE_TABLE_FACTORY_H_
#define MULTIVERSO_INCLUDE_TABLE_FACTORY_H_
#include <functional>
#include <string>
#include <map>
#include "multiverso/util/log.h"
namespace multiverso {
// TODO(feiga): Refine
// TODO(feiga): provide better table creator method
// Abstract Factory to create server and worker
//class TableFactory {
// // static TableFactory* GetTableFactory();
// virtual WorkerTable* CreateWorker() = 0;
// virtual ServerTable* CreateServer() = 0;
// static TableFactory* fatory_;
//};
//namespace table {
//}
//class TableBuilder {
//public:
// TableBuilder& SetArribute(const std::string& name, const std::string& val);
// WorkerTable* WorkerTableBuild();
// ServerTable* ServerTableBuild();
//private:
// std::string Get(const std::string& name) const;
// std::unordered_map<std::string, std::string> params_;
//};
//class Context {
//public:
// Context& SetArribute(const std::string& name, const std::string& val);
//
// int get_int(const std::string& name) {
// CHECK(params_.find(name) != params_.end());
// return atoi(params_[name].c_str());
// }
//
//private:
// std::string get(const std::string& name) const;
// std::map<std::string, std::string> params_;
//};
//
//class WorkerTable;
//
//class TableRegistry {
//public:
// typedef WorkerTable* (*Creator)(const Context&);
// typedef std::map<std::string, Creator> Registry;
// static TableRegistry* Global();
//
// static void AddCreator(const std::string& type, Creator creator) {
// Registry& r = registry();
// r[type] = creator;
// }
//
// static Registry& registry() {
// static Registry instance;
// return instance;
// }
//
// static WorkerTable* CreateTable(const std::string& type, const Context& context) {
// Registry& r = registry();
// return r[type](context);
//
// }
//
//private:
// TableRegistry() {}
//};
//
//class TableRegisterer {
//public:
// TableRegisterer(const std::string& type,
// WorkerTable* (*creator)(const Context&)) {
// TableRegistry::AddCreator(type, creator);
// }
//};
//
//#define REGISTER_TABLE_CREATOR(type, creator) \
// static TableRegisterer(type, creator) g_creator_##type(#type, creator);
//
}
#endif // MULTIVERSO_INCLUDE_TABLE_FACTORY_H_