implementation of sync with backup worker. simple test, need further test, code is dirty, need clean

This commit is contained in:
feiga 2016-04-19 19:43:25 +08:00
Родитель 948885a6a3
Коммит 946032ad19
4 изменённых файлов: 152 добавлений и 7 удалений

Просмотреть файл

@ -4,6 +4,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00
VisualStudioVersion = 12.0.40629.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Test", "Test\Test.vcxproj", "{546681D6-495C-4AEE-BBC2-3CAEC86B5137}"
ProjectSection(ProjectDependencies) = postProject
{16F14058-B116-49D9-8BA0-209F3AFFE849} = {16F14058-B116-49D9-8BA0-209F3AFFE849}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Multiverso", "src\Multiverso.vcxproj", "{16F14058-B116-49D9-8BA0-209F3AFFE849}"
EndProject

Просмотреть файл

@ -88,8 +88,6 @@ void TestArray(int argc, char* argv[]) {
int iter = 1000;
if (argc == 2) iter = atoi(argv[1]);
for (int i = 0; i < iter; ++i) {
// std::vector<float>& vec = shared_array->raw();

Просмотреть файл

@ -102,7 +102,12 @@ void SetCMDFlag(const std::string& name, const T& value) {
#define MV_DECLARE_bool(name) \
DECLARE_CONFIGURE(bool, name)
#define MV_DEFINE_double(name, default_value, text) \
DEFINE_CONFIGURE(double, name, default_value, text)
#define MV_DECLARE_double(name) \
DECLARE_CONFIGURE(double, name)
} // namespace multiverso
#endif // MULTIVERSO_UTIL_CONFIGURE_H_

Просмотреть файл

@ -18,6 +18,7 @@
namespace multiverso {
MV_DEFINE_bool(sync, false, "sync or async");
MV_DEFINE_int(backup_worker_ratio, 0, "ratio% of backup workers, set 20 means 20%");
Server::Server() : Actor(actor::kServer) {
RegisterHandler(MsgType::Request_Get, std::bind(
@ -72,9 +73,11 @@ public:
// please not use in other place, may different with the general vector clock
class VectorClock {
public:
explicit VectorClock(int n) : local_clock_(n, 0), global_clock_(0) {}
explicit VectorClock(int n) :
local_clock_(n, 0), global_clock_(0), size_(0) {}
bool Update(int i) {
// Return true when all clock reach a same number
virtual bool Update(int i) {
++local_clock_[i];
if (global_clock_ < *(std::min_element(std::begin(local_clock_),
std::end(local_clock_)))) {
@ -97,9 +100,10 @@ public:
int local_clock(int i) const { return local_clock_[i]; }
int global_clock() const { return global_clock_; }
private:
protected:
std::vector<int> local_clock_;
int global_clock_;
int size_;
};
protected:
void ProcessAdd(MessagePtr& msg) override {
@ -157,9 +161,144 @@ private:
MtQueue<MessagePtr> msg_get_cache_;
};
class WithBackupSyncServer : public Server {
public:
WithBackupSyncServer() : Server() {
num_worker_ = Zoo::Get()->num_workers();
double backup_ratio = (double)MV_CONFIG_backup_worker_ratio / 100;
num_sync_worker_ = num_worker_ -
static_cast<int>(backup_ratio * num_worker_);
CHECK(num_sync_worker_ > 0 && num_sync_worker_ <= num_worker_);
if (num_sync_worker_ == num_worker_) {
Log::Info("No backup worker, using the sync mode\n");
}
Log::Info("Sync with backup worker start: num_sync_worker = %d,"
"num_total_worker = %d\n", num_sync_worker_, num_worker_);
worker_get_clocks_.reset(new SyncServer::VectorClock(num_worker_));
worker_add_clocks_.reset(new VectorClock(num_worker_, num_sync_worker_));
}
// make some modification to suit to the sync server
// please not use in other place, may different with the general vector clock
class VectorClock {
public:
VectorClock(int num_worker, int num_sync_worker) :
local_clock_(num_worker, 0), global_clock_(0), num_worker_(num_worker),
num_sync_worker_(num_sync_worker), progress_(0) {}
// Return true when all clock reach a same number
virtual bool Update(int i) {
if (local_clock_[i]++ == global_clock_) {
++progress_;
}
if (progress_ >= num_sync_worker_) {
++global_clock_;
progress_ = 0;
for (auto i : local_clock_) {
if (i > global_clock_) ++progress_;
}
if (global_clock_ == *(std::max_element(std::begin(local_clock_),
std::end(local_clock_)))) {
return true;
}
}
return false;
}
std::string DebugString() {
std::string os = "global ";
os += std::to_string(global_clock_) + " local: ";
for (auto i : local_clock_) os += std::to_string(i) + " ";
return os;
}
int local_clock(int i) const { return local_clock_[i]; }
int global_clock() const { return global_clock_; }
protected:
std::vector<int> local_clock_;
int global_clock_;
int num_worker_;
int num_sync_worker_;
int progress_;
};
protected:
void ProcessAdd(MessagePtr& msg) override {
// 1. Before add: cache faster worker
int worker = Zoo::Get()->rank_to_worker_id(msg->src());
if (worker_get_clocks_->local_clock(worker) >
worker_get_clocks_->global_clock()) {
msg_add_cache_.Push(msg);
return;
}
// 2. Process Add
if (worker_add_clocks_->local_clock(worker) >=
worker_add_clocks_->global_clock()) {
Server::ProcessAdd(msg);
}
// 3. After add: process cached process get if necessary
if (worker_add_clocks_->Update(worker)) {
CHECK(msg_add_cache_.Empty());
while (!msg_get_cache_.Empty()) {
MessagePtr get_msg;
CHECK(msg_get_cache_.TryPop(get_msg));
int get_worker = Zoo::Get()->rank_to_worker_id(get_msg->src());
Server::ProcessGet(get_msg);
worker_get_clocks_->Update(get_worker);
}
}
}
void ProcessGet(MessagePtr& msg) override {
// 1. Before get: cache faster worker
int worker = Zoo::Get()->rank_to_worker_id(msg->src());
if (worker_add_clocks_->local_clock(worker) >
worker_add_clocks_->global_clock()) {
// Will wait for other worker finished Add
msg_get_cache_.Push(msg);
return;
}
// 2. Process Get
Server::ProcessGet(msg);
// 3. After get: process cached process add if necessary
if (worker_get_clocks_->Update(worker)) {
CHECK(msg_get_cache_.Empty());
while (!msg_add_cache_.Empty()) {
MessagePtr add_msg;
CHECK(msg_add_cache_.TryPop(add_msg));
int add_worker = Zoo::Get()->rank_to_worker_id(add_msg->src());
if (worker_add_clocks_->local_clock(add_worker) >=
worker_add_clocks_->global_clock()) {
Server::ProcessAdd(msg);
};
worker_add_clocks_->Update(add_worker);
}
}
}
private:
std::unique_ptr<SyncServer::VectorClock> worker_get_clocks_;
std::unique_ptr<VectorClock> worker_add_clocks_;
MtQueue<MessagePtr> msg_add_cache_;
MtQueue<MessagePtr> msg_get_cache_;
// num_worker_ - num_sync_worker_ = num_backup_worker_
int num_sync_worker_;
int num_worker_;
};
Server* Server::GetServer() {
if (MV_CONFIG_sync) return new SyncServer();
return new Server();
if (!MV_CONFIG_sync) {
Log::Info("Create a async server\n");
return new Server();
}
if (MV_CONFIG_backup_worker_ratio > 0.0) {
Log::Info("Create a sync server with backup worker\n");
return new WithBackupSyncServer();
}
Log::Info("Create a sync server\n");
return new SyncServer();
}
} // namespace multiverso