zmq hangs issue. a tricky solution

This commit is contained in:
feiga 2016-06-20 20:32:57 +08:00
Родитель 6091fbac06
Коммит 72fae9524b
4 изменённых файлов: 39 добавлений и 15 удалений

Просмотреть файл

@ -10,6 +10,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Test", "Test\Test.vcxproj",
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Multiverso", "src\Multiverso.vcxproj", "{16F14058-B116-49D9-8BA0-209F3AFFE849}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "app", "app", "{E89A2FAD-9A40-49CD-89C1-519721658DF0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
debug_zmq|x64 = debug_zmq|x64

Просмотреть файл

@ -29,11 +29,10 @@ public:
// get machine file
if (active_) return;
// CHECK(*argc > 2);
std::vector<std::string> machine_lists;
ParseMachineFile(MV_CONFIG_machine_file, &machine_lists);
ParseMachineFile(MV_CONFIG_machine_file, &machine_lists_);
int port = MV_CONFIG_port; // atoi(argv[2]);
size_ = static_cast<int>(machine_lists.size());
size_ = static_cast<int>(machine_lists_.size());
CHECK(size_ > 0);
std::unordered_set<std::string> local_ip;
net::GetLocalIPAddress(&local_ip);
@ -41,7 +40,7 @@ public:
context_ = zmq_ctx_new();
zmq_ctx_set(context_, ZMQ_MAX_SOCKETS, 256);
for (auto ip : machine_lists) {
for (auto ip : machine_lists_) {
if (local_ip.find(ip) != local_ip.end()) { // my rank
rank_ = static_cast<int>(senders_.size());
senders_.push_back(nullptr);
@ -49,12 +48,16 @@ public:
int rc = zmq_bind(receiver_,
("tcp://" + ip + ":" + std::to_string(port)).c_str());
CHECK(rc == 0);
int linger = 0;
CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
} else {
void* senders = zmq_socket(context_, ZMQ_DEALER);
int rc = zmq_connect(senders,
void* sender = zmq_socket(context_, ZMQ_DEALER);
int rc = zmq_connect(sender,
("tcp://" + ip + ":" + std::to_string(port)).c_str());
CHECK(rc == 0);
senders_.push_back(senders);
senders_.push_back(sender);
int linger = 0;
CHECK(zmq_setsockopt(sender, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
}
}
CHECK_NOTNULL(receiver_);
@ -71,6 +74,8 @@ public:
receiver_ = zmq_socket(context_, ZMQ_DEALER);
int rc = zmq_bind(receiver_, ("tcp://" + ip_port).c_str());
if (rc == 0) {
int linger = 0;
CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
return 0;
}
else {
@ -91,6 +96,7 @@ public:
if (rank == rank_) continue;
std::string ip_port(endpoints[i]);
senders_[rank] = zmq_socket(context_, ZMQ_DEALER);
// NOTE(feiga): set linger to 0, otherwise will hang
int rc = zmq_connect(senders_[rank], ("tcp://" + ip_port).c_str());
if (rc != 0) {
Log::Error("Failed to connect the socket for sender, rank = %d, "
@ -104,10 +110,22 @@ public:
void Finalize() override {
active_ = false;
// zmq_term(context_);
zmq_close(receiver_);
for (auto& p : senders_) if (p) zmq_close(p);
zmq_ctx_destroy(context_);
for (int i = 0; i < senders_.size(); ++i) {
if (i != rank_) {
int linger = 0;
CHECK(zmq_setsockopt(senders_[i], ZMQ_LINGER, &linger, sizeof(linger)) == 0);
int rc = zmq_close(senders_[i]);
CHECK(rc == 0);
}
}
int linger = 0;
CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
int rc = zmq_close(receiver_);
CHECK(rc == 0);
CHECK(zmq_ctx_shutdown(context_)==0);
// zmq_ctx_term(context_);
Log::Info("zmq finalize: close context\n");
}
bool active() const override { return active_; }
@ -151,7 +169,7 @@ public:
MessagePtr& msg = *msg_ptr;
msg->data().clear();
CHECK(msg.get());
recv_size = zmq_recv(receiver_, msg->header(), Message::kHeaderSize, 0);
recv_size = zmq_recv(receiver_, msg->header(), Message::kHeaderSize, ZMQ_DONTWAIT);
if (recv_size < 0) { return -1; }
CHECK(Message::kHeaderSize == recv_size);
@ -236,6 +254,7 @@ protected:
std::vector<void*> senders_;
int rank_;
int size_;
std::vector<std::string> machine_lists_;
};
} // namespace multiverso

Просмотреть файл

@ -33,7 +33,7 @@ Communicator::Communicator() : Actor(actor::kCommunicator) {
}
Communicator::~Communicator() {
recv_thread_->join();
}
void Communicator::Main() {
@ -78,8 +78,9 @@ void Communicator::Communicate() {
MessagePtr msg(new Message());
size_t size = net_util_->Recv(&msg);
if (size == -1) {
Log::Debug("recv return -1\n");
break;
continue;
// Log::Debug("recv return -1\n");
// break;
}
if (size > 0) {
// a message received

Просмотреть файл

@ -50,6 +50,8 @@ void Zoo::Stop(bool finalize_net) {
if (!MV_CONFIG_ma) { StopPS(); }
// Stop the network
if (finalize_net) net_util_->Finalize();
for (auto actor : zoo_) delete actor.second;
Log::Info("Multiverso Shutdown successfully\n");
}
int Zoo::rank() const { return NetInterface::Get()->rank(); }