From 195317b6a75009181e57d05c1683899fbaf9f34e Mon Sep 17 00:00:00 2001 From: feiga Date: Thu, 24 Mar 2016 14:35:42 +0800 Subject: [PATCH] Add dashboard to monitor the system excuation information for profile --- next/IMultiverso.vcxproj | 4 + next/IMultiverso.vcxproj.filters | 12 +++ next/Test/Test.vcxproj | 3 + next/Test/main.cpp | 1 - next/include/multiverso/dashboard.h | 81 ++++++++++++++++++++ next/include/multiverso/multiverso.h | 9 +-- next/include/multiverso/net/mpi_net.h | 18 +++-- next/include/multiverso/node.h | 5 ++ next/include/multiverso/table_interface.h | 46 +++--------- next/include/multiverso/util/timer.h | 19 ++++- next/src/dashboard.cpp | 42 +++++++++++ next/src/node.cpp | 4 +- next/src/server.cpp | 6 ++ next/src/table.cpp | 14 +++- next/src/timer.cpp | 20 +++++ next/src/worker.cpp | 10 ++- next/src/zoo.cpp | 3 + next/table_factory.h | 92 +++++++++++++++++++++++ 18 files changed, 335 insertions(+), 54 deletions(-) create mode 100644 next/include/multiverso/dashboard.h create mode 100644 next/src/dashboard.cpp create mode 100644 next/src/timer.cpp create mode 100644 next/table_factory.h diff --git a/next/IMultiverso.vcxproj b/next/IMultiverso.vcxproj index 3a14cb6..4f8eb36 100644 --- a/next/IMultiverso.vcxproj +++ b/next/IMultiverso.vcxproj @@ -171,6 +171,7 @@ + @@ -193,16 +194,19 @@ + + + diff --git a/next/IMultiverso.vcxproj.filters b/next/IMultiverso.vcxproj.filters index 557d377..f004dd7 100644 --- a/next/IMultiverso.vcxproj.filters +++ b/next/IMultiverso.vcxproj.filters @@ -79,6 +79,12 @@ util + + include + + + system + @@ -134,5 +140,11 @@ system + + util + + + system + \ No newline at end of file diff --git a/next/Test/Test.vcxproj b/next/Test/Test.vcxproj index a30e256..40511b4 100644 --- a/next/Test/Test.vcxproj +++ b/next/Test/Test.vcxproj @@ -83,6 +83,9 @@ true IMultiverso.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies) + + false + diff --git a/next/Test/main.cpp b/next/Test/main.cpp index 2d07225..8071f81 100644 --- a/next/Test/main.cpp +++ b/next/Test/main.cpp @@ -40,7 +40,6 @@ void TestKV(int argc, char* argv[]) { KVWorkerTable* dht = new KVWorkerTable(); // if the node is server, then create a server storage table KVServerTable* server_dht = new KVServerTable(); - Log::Info("table name %s", server_dht->name().c_str()); MV_Barrier(); diff --git a/next/include/multiverso/dashboard.h b/next/include/multiverso/dashboard.h new file mode 100644 index 0000000..dac750c --- /dev/null +++ b/next/include/multiverso/dashboard.h @@ -0,0 +1,81 @@ +#ifndef MULTIVERSO_INCLUDE_DASHBOARD_H_ +#define MULTIVERSO_INCLUDE_DASHBOARD_H_ + +#include +#include +#include + +#include "multiverso/util/timer.h" + +namespace multiverso { + +class Monitor; + +// Dashboard to record and query system running information +// thread safe +class Dashboard { +public: + static void AddMonitor(const std::string& name, Monitor* monitor); + static std::string Watch(const std::string& name); + static void Display(); +private: + static std::map record_; + static std::mutex m_; +}; + +class Monitor { +public: + explicit Monitor(const std::string& name) { + name_ = name; + timer_.Start(); + Dashboard::AddMonitor(name_, this); + } + + void Begin() { timer_.Start(); } + + void End() { + elapse_ += timer_.elapse(); + ++count_; + } + + double average() const { return elapse_ / count_; }; + + std::string name() const { return name_; } + double elapse() const { return elapse_; } + int count() const { return count_; } + + std::string info_string() const; + +private: + // name of the Monitor + std::string name_; + // total elapsed time + double elapse_; + // count of monitor + int count_; + // a timer util + Timer timer_; +}; + +// NOTE(feiga): user shouldn't call this, used in MONITOR_BEGIN as a local var +// note that static variable under different context may differs +// please use either only in global scope or only in local scope +#define REGISTER_MONITOR(name) \ + static Monitor g_##name##_monitor(#name); + +// Guard with MONITOR macro in the code to monitor it's excuation +// Usage: +// MONITOR_BEGIN(your_code) +// your code +// MONITOR_END(your_code) +#define MONITOR_BEGIN(name) \ + REGISTER_MONITOR(name) \ + g_##name##_monitor.Begin(); + +#define MONITOR_END(name) \ + g_##name##_monitor.End(); + + +} + +#endif // MULTIVERSO_INCLUDE_DASHBOARD_H_ \ No newline at end of file diff --git a/next/include/multiverso/multiverso.h b/next/include/multiverso/multiverso.h index 9f05696..cbd7b9f 100644 --- a/next/include/multiverso/multiverso.h +++ b/next/include/multiverso/multiverso.h @@ -3,16 +3,9 @@ namespace multiverso { -enum Role { - Null = 0, - Worker = 1, - Server = 2, - All = 3 -}; - void MV_Init(int* argc = nullptr, char* argv[] = nullptr, - int role = All); + int role = 3); void MV_Barrier(); diff --git a/next/include/multiverso/net/mpi_net.h b/next/include/multiverso/net/mpi_net.h index 6ee2606..0c5e73b 100644 --- a/next/include/multiverso/net/mpi_net.h +++ b/next/include/multiverso/net/mpi_net.h @@ -10,6 +10,7 @@ #include #include "multiverso/message.h" +#include "multiverso/dashboard.h" #include "multiverso/util/log.h" #include "multiverso/util/mt_queue.h" @@ -198,6 +199,7 @@ public: size_t SerializeAndSend(MessagePtr& msg, MPIMsgHandle* msg_handle) { CHECK_NOTNULL(msg_handle); + MONITOR_BEGIN(MPI_NET_SEND_SERIALIZE); size_t size = sizeof(int) + Message::kHeaderSize; for (auto& data : msg->data()) size += sizeof(int) + data.size(); if (size > send_size_) { @@ -207,17 +209,18 @@ public: memcpy(send_buffer_, msg->header(), Message::kHeaderSize); char* p = send_buffer_ + Message::kHeaderSize; for (auto& data : msg->data()) { - int s = data.size(); - memcpy(p, &s, sizeof(int)); - p += sizeof(int); + size_t s = data.size(); + memcpy(p, &s, sizeof(size_t)); + p += sizeof(size_t); memcpy(p, data.data(), s); p += s; } int over = -1; memcpy(p, &over, sizeof(int)); + MONITOR_END(MPI_NET_SEND_SERIALIZE); MPI_Request handle; - MV_MPI_CALL(MPI_Isend(send_buffer_, size, MPI_BYTE, msg->dst(), 0, MPI_COMM_WORLD, &handle)); + MV_MPI_CALL(MPI_Isend(send_buffer_, static_cast(size), MPI_BYTE, msg->dst(), 0, MPI_COMM_WORLD, &handle)); msg_handle->add_handle(handle); return size; } @@ -229,6 +232,8 @@ public: MPI_Status status; MV_MPI_CALL(MPI_Recv(recv_buffer_, count, MPI_BYTE, src, 0, MPI_COMM_WORLD, &status)); + + MONITOR_BEGIN(MPI_NET_RECV_DESERIALIZE) char* p = recv_buffer_; int s; memcpy(msg->header(), p, Message::kHeaderSize); @@ -243,6 +248,7 @@ public: memcpy(&s, p, sizeof(int)); p += sizeof(int); } + MONITOR_END(MPI_NET_RECV_DESERIALIZE) return count; } @@ -317,9 +323,9 @@ private: std::unique_ptr last_handle_; MtQueue send_queue_; char* send_buffer_; - int send_size_; + size_t send_size_; char* recv_buffer_; - int recv_size_; + size_t recv_size_; }; } diff --git a/next/include/multiverso/node.h b/next/include/multiverso/node.h index 2b4290f..26e9cdd 100644 --- a/next/include/multiverso/node.h +++ b/next/include/multiverso/node.h @@ -3,6 +3,11 @@ namespace multiverso { +enum Role { + WORKER = 1, + SERVER = 2 +}; + struct Node { int rank; // role can be 0, 1, 2, 3 diff --git a/next/include/multiverso/table_interface.h b/next/include/multiverso/table_interface.h index bdd0ff2..9dfbc29 100644 --- a/next/include/multiverso/table_interface.h +++ b/next/include/multiverso/table_interface.h @@ -16,10 +16,9 @@ public: WorkerTable(); virtual ~WorkerTable() {} - void Get(Blob keys) { Wait(GetAsync(keys)); } - void Add(Blob keys, Blob values) { Wait(AddAsync(keys, values)); } + void Get(Blob keys); + void Add(Blob keys, Blob values); - // TODO(feiga): add call back int GetAsync(Blob keys); int AddAsync(Blob keys, Blob values); @@ -34,8 +33,6 @@ public: virtual void ProcessReplyGet(std::vector&) = 0; - const std::string name() { return std::string(typeid(this).name());}; - // add user defined data structure private: std::string table_name_; @@ -44,6 +41,16 @@ private: int msg_id_; }; +// TODO(feiga): move to a seperate file +class Stream; + +// interface for checkpoint table +class Serializable { +public: + virtual void Store(Stream* s) = 0; + virtual void Load(Stream* s) = 0; +}; + // discribe the server parameter storage data structure and related method class ServerTable { public: @@ -52,35 +59,6 @@ public: virtual void ProcessAdd(const std::vector& data) = 0; virtual void ProcessGet(const std::vector& data, std::vector* result) = 0; - - const std::string name() const { return std::string(typeid(this).name());}; - - // add user defined server process logic - void Process(const std::string instruction, const std::vector& data, std::vector* result = nullptr); - - // add user defined server storage data structure -}; - -// TODO(feiga): provide better table creator method -// Abstract Factory to create server and worker -class TableFactory { - // static TableFactory* GetTableFactory(); - virtual WorkerTable* CreateWorker() = 0; - virtual ServerTable* CreateServer() = 0; - static TableFactory* fatory_; -}; - -namespace table { - -} - -class TableBuilder { -public: - TableBuilder& SetArribute(const std::string& name, const std::string& val); - WorkerTable* WorkerTableBuild(); - ServerTable* ServerTableBuild(); -private: - std::unordered_map params_; }; } diff --git a/next/include/multiverso/util/timer.h b/next/include/multiverso/util/timer.h index 9f0cd63..ab34611 100644 --- a/next/include/multiverso/util/timer.h +++ b/next/include/multiverso/util/timer.h @@ -2,7 +2,24 @@ #define MULTIVERSO_TIMER_H_ #include -#include +namespace multiverso { + +class Timer { +public: + Timer(); + + // (Re)start the timer + void Start(); + + // Get elapsed milliseconds since last Timer::Start + double elapse(); + +private: + using TimePoint = std::chrono::system_clock::time_point; + TimePoint start_point_; +}; + +} #endif //MULTIVERSO_TIMER_H_ \ No newline at end of file diff --git a/next/src/dashboard.cpp b/next/src/dashboard.cpp new file mode 100644 index 0000000..0a7819c --- /dev/null +++ b/next/src/dashboard.cpp @@ -0,0 +1,42 @@ +#include "multiverso/dashboard.h" +#include "multiverso/util/log.h" + +#include + +namespace multiverso { +std::map Dashboard::record_; +std::mutex Dashboard::m_; + +void Dashboard::AddMonitor(const std::string& name, Monitor* monitor) { + std::lock_guard l(m_); + CHECK(record_[name] == nullptr); + record_[name] = monitor; + Log::Info("Add monitor %s\n", name.c_str()); +} + +std::string Dashboard::Watch(const std::string& name) { + std::lock_guard l(m_); + std::string result; + if (record_.find(name) == record_.end()) return result; + Monitor* monitor = record_[name]; + CHECK_NOTNULL(monitor); + return monitor->info_string(); +} + +std::string Monitor::info_string() const { + std::ostringstream oss; + oss << "Monitor (" << name_ << ")] " + << " count = " << count_ + << " elapse = " << elapse_ << "ms" + << " average = " << average() << "ms"; + return oss.str(); +} + +void Dashboard::Display() { + std::lock_guard l(m_); + Log::Info("--------------Show dashboard monitor information--------------\n"); + for (auto& it : record_) Log::Info("%s\n", it.second->info_string().c_str()); + Log::Info("--------------------------------------------------------------\n"); +} + +} \ No newline at end of file diff --git a/next/src/node.cpp b/next/src/node.cpp index 9cf5672..ac5efa8 100644 --- a/next/src/node.cpp +++ b/next/src/node.cpp @@ -5,8 +5,8 @@ namespace multiverso { Node::Node() : rank(-1), role(-1), worker_id(-1), server_id(-1) {} namespace node { -bool is_worker(int role) { return (role & 1) != 0; } -bool is_server(int role) { return (role & 2) != 0; } +bool is_worker(int role) { return (role & Role::WORKER) != 0; } +bool is_server(int role) { return (role & Role::SERVER) != 0; } } diff --git a/next/src/server.cpp b/next/src/server.cpp index 09c424b..e723954 100644 --- a/next/src/server.cpp +++ b/next/src/server.cpp @@ -4,6 +4,8 @@ #include "multiverso/table_interface.h" #include "multiverso/zoo.h" +#include "multiverso/dashboard.h" + namespace multiverso { Server::Server() : Actor(actor::kServer) { @@ -20,19 +22,23 @@ int Server::RegisterTable(ServerTable* server_table) { } void Server::ProcessGet(MessagePtr& msg) { + MONITOR_BEGIN(SERVER_PROCESS_GET); MessagePtr reply(msg->CreateReplyMessage()); int table_id = msg->table_id(); CHECK(table_id >= 0 && table_id < store_.size()); store_[table_id]->ProcessGet(msg->data(), &reply->data()); SendTo(actor::kCommunicator, reply); + MONITOR_END(SERVER_PROCESS_GET); } void Server::ProcessAdd(MessagePtr& msg) { + MONITOR_BEGIN(SERVER_PROCESS_ADD) MessagePtr reply(msg->CreateReplyMessage()); int table_id = msg->table_id(); CHECK(table_id >= 0 && table_id < store_.size()); store_[table_id]->ProcessAdd(msg->data()); SendTo(actor::kCommunicator, reply); + MONITOR_END(SERVER_PROCESS_ADD) } } \ No newline at end of file diff --git a/next/src/table.cpp b/next/src/table.cpp index 28cbb98..6f5436a 100644 --- a/next/src/table.cpp +++ b/next/src/table.cpp @@ -1,8 +1,8 @@ #include "multiverso/table_interface.h" - #include "multiverso/util/log.h" #include "multiverso/util/waiter.h" #include "multiverso/zoo.h" +#include "multiverso/dashboard.h" namespace multiverso { @@ -14,6 +14,18 @@ ServerTable::ServerTable() { Zoo::Get()->RegisterTable(this); } +void WorkerTable::Get(Blob keys) { + MONITOR_BEGIN(WORKER_TABLE_SYNC_GET) + Wait(GetAsync(keys)); + MONITOR_END(WORKER_TABLE_SYNC_GET) +} + +void WorkerTable::Add(Blob keys, Blob values) { + MONITOR_BEGIN(WORKER_TABLE_SYNC_ADD) + Wait(AddAsync(keys, values)); + MONITOR_END(WORKER_TABLE_SYNC_ADD) +} + int WorkerTable::GetAsync(Blob keys) { int id = msg_id_++; waitings_[id] = new Waiter(); diff --git a/next/src/timer.cpp b/next/src/timer.cpp new file mode 100644 index 0000000..0833011 --- /dev/null +++ b/next/src/timer.cpp @@ -0,0 +1,20 @@ +#include "multiverso/util/timer.h" + +namespace multiverso { + +Timer::Timer() { + Start(); +} + +void Timer::Start() { + start_point_ = std::chrono::high_resolution_clock::now(); +} + +double Timer::elapse() { + TimePoint end_point = std::chrono::high_resolution_clock::now(); + std::chrono::duration time_ms = + end_point - start_point_; + return time_ms.count(); +} + +} \ No newline at end of file diff --git a/next/src/worker.cpp b/next/src/worker.cpp index e9377c1..b331ab7 100644 --- a/next/src/worker.cpp +++ b/next/src/worker.cpp @@ -1,4 +1,6 @@ #include "multiverso/worker.h" + +#include "multiverso/dashboard.h" #include "multiverso/util/mt_queue.h" #include "multiverso/zoo.h" @@ -21,6 +23,7 @@ int Worker::RegisterTable(WorkerTable* worker_table) { } void Worker::ProcessGet(MessagePtr& msg) { + MONITOR_BEGIN(WORKER_PROCESS_GET) int table_id = msg->table_id(); int msg_id = msg->msg_id(); std::unordered_map> partitioned_key; @@ -36,9 +39,11 @@ void Worker::ProcessGet(MessagePtr& msg) { msg->set_data(it.second); SendTo(actor::kCommunicator, msg); } + MONITOR_END(WORKER_PROCESS_GET) } void Worker::ProcessAdd(MessagePtr& msg) { + MONITOR_BEGIN(WORKER_PROCESS_ADD) int table_id = msg->table_id(); int msg_id = msg->msg_id(); std::unordered_map> partitioned_kv; @@ -47,7 +52,7 @@ void Worker::ProcessAdd(MessagePtr& msg) { int num = cache_[table_id]->Partition(msg->data(), &partitioned_kv); cache_[table_id]->Reset(msg_id, num); for (auto& it : partitioned_kv) { - MessagePtr msg(new Message()); // = std::make_unique(); + MessagePtr msg(new Message()); msg->set_src(Zoo::Get()->rank()); msg->set_dst(it.first); msg->set_type(MsgType::Request_Add); @@ -56,12 +61,15 @@ void Worker::ProcessAdd(MessagePtr& msg) { msg->set_data(it.second); SendTo(actor::kCommunicator, msg); } + MONITOR_END(WORKER_PROCESS_ADD) } void Worker::ProcessReplyGet(MessagePtr& msg) { + MONITOR_BEGIN(WORKER_PROCESS_REPLY_GET) int table_id = msg->table_id(); cache_[table_id]->ProcessReplyGet(msg->data()); cache_[table_id]->Notify(msg->msg_id()); + MONITOR_END(WORKER_PROCESS_REPLY_GET) } void Worker::ProcessReplyAdd(MessagePtr& msg) { diff --git a/next/src/zoo.cpp b/next/src/zoo.cpp index 722b192..e9c6cc4 100644 --- a/next/src/zoo.cpp +++ b/next/src/zoo.cpp @@ -8,6 +8,7 @@ #include "multiverso/worker.h" #include "multiverso/server.h" #include "multiverso/controller.h" +#include "multiverso/dashboard.h" namespace multiverso { @@ -42,6 +43,8 @@ void Zoo::Stop(bool finalize_net) { // Stop the system Barrier(); + Dashboard::Display(); + // Stop all actors for (auto actor : zoo_) { actor.second->Stop(); } // Stop the network diff --git a/next/table_factory.h b/next/table_factory.h new file mode 100644 index 0000000..99e82fe --- /dev/null +++ b/next/table_factory.h @@ -0,0 +1,92 @@ +#ifndef MULTIVERSO_INCLUDE_TABLE_FACTORY_H_ +#define MULTIVERSO_INCLUDE_TABLE_FACTORY_H_ + +#include +#include +#include + +#include "multiverso/util/log.h" + +namespace multiverso { + +// TODO(feiga): Refine + +// TODO(feiga): provide better table creator method +// Abstract Factory to create server and worker +//class TableFactory { +// // static TableFactory* GetTableFactory(); +// virtual WorkerTable* CreateWorker() = 0; +// virtual ServerTable* CreateServer() = 0; +// static TableFactory* fatory_; +//}; + +//namespace table { + +//} + +//class TableBuilder { +//public: +// TableBuilder& SetArribute(const std::string& name, const std::string& val); +// WorkerTable* WorkerTableBuild(); +// ServerTable* ServerTableBuild(); +//private: +// std::string Get(const std::string& name) const; +// std::unordered_map params_; +//}; + +//class Context { +//public: +// Context& SetArribute(const std::string& name, const std::string& val); +// +// int get_int(const std::string& name) { +// CHECK(params_.find(name) != params_.end()); +// return atoi(params_[name].c_str()); +// } +// +//private: +// std::string get(const std::string& name) const; +// std::map params_; +//}; +// +//class WorkerTable; +// +//class TableRegistry { +//public: +// typedef WorkerTable* (*Creator)(const Context&); +// typedef std::map Registry; +// static TableRegistry* Global(); +// +// static void AddCreator(const std::string& type, Creator creator) { +// Registry& r = registry(); +// r[type] = creator; +// } +// +// static Registry& registry() { +// static Registry instance; +// return instance; +// } +// +// static WorkerTable* CreateTable(const std::string& type, const Context& context) { +// Registry& r = registry(); +// return r[type](context); +// +// } +// +//private: +// TableRegistry() {} +//}; +// +//class TableRegisterer { +//public: +// TableRegisterer(const std::string& type, +// WorkerTable* (*creator)(const Context&)) { +// TableRegistry::AddCreator(type, creator); +// } +//}; +// +//#define REGISTER_TABLE_CREATOR(type, creator) \ +// static TableRegisterer(type, creator) g_creator_##type(#type, creator); +// +} + +#endif // MULTIVERSO_INCLUDE_TABLE_FACTORY_H_ \ No newline at end of file