From 72fae9524b81817202b1d0aebd233e0950a4d8b0 Mon Sep 17 00:00:00 2001 From: feiga Date: Mon, 20 Jun 2016 20:32:57 +0800 Subject: [PATCH] zmq hangs issue. a tricky solution --- Multiverso.sln | 2 ++ include/multiverso/net/zmq_net.h | 43 +++++++++++++++++++++++--------- src/communicator.cpp | 7 +++--- src/zoo.cpp | 2 ++ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/Multiverso.sln b/Multiverso.sln index 0c23b3f..92ff352 100644 --- a/Multiverso.sln +++ b/Multiverso.sln @@ -10,6 +10,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Test", "Test\Test.vcxproj", EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Multiverso", "src\Multiverso.vcxproj", "{16F14058-B116-49D9-8BA0-209F3AFFE849}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "app", "app", "{E89A2FAD-9A40-49CD-89C1-519721658DF0}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution debug_zmq|x64 = debug_zmq|x64 diff --git a/include/multiverso/net/zmq_net.h b/include/multiverso/net/zmq_net.h index 34c5f09..28c6876 100644 --- a/include/multiverso/net/zmq_net.h +++ b/include/multiverso/net/zmq_net.h @@ -29,11 +29,10 @@ public: // get machine file if (active_) return; // CHECK(*argc > 2); - std::vector machine_lists; - ParseMachineFile(MV_CONFIG_machine_file, &machine_lists); + ParseMachineFile(MV_CONFIG_machine_file, &machine_lists_); int port = MV_CONFIG_port; // atoi(argv[2]); - size_ = static_cast(machine_lists.size()); + size_ = static_cast(machine_lists_.size()); CHECK(size_ > 0); std::unordered_set local_ip; net::GetLocalIPAddress(&local_ip); @@ -41,7 +40,7 @@ public: context_ = zmq_ctx_new(); zmq_ctx_set(context_, ZMQ_MAX_SOCKETS, 256); - for (auto ip : machine_lists) { + for (auto ip : machine_lists_) { if (local_ip.find(ip) != local_ip.end()) { // my rank rank_ = static_cast(senders_.size()); senders_.push_back(nullptr); @@ -49,12 +48,16 @@ public: int rc = zmq_bind(receiver_, ("tcp://" + ip + ":" + std::to_string(port)).c_str()); CHECK(rc == 0); + int linger = 0; + CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0); } else { - void* senders = zmq_socket(context_, ZMQ_DEALER); - int rc = zmq_connect(senders, + void* sender = zmq_socket(context_, ZMQ_DEALER); + int rc = zmq_connect(sender, ("tcp://" + ip + ":" + std::to_string(port)).c_str()); CHECK(rc == 0); - senders_.push_back(senders); + senders_.push_back(sender); + int linger = 0; + CHECK(zmq_setsockopt(sender, ZMQ_LINGER, &linger, sizeof(linger)) == 0); } } CHECK_NOTNULL(receiver_); @@ -71,6 +74,8 @@ public: receiver_ = zmq_socket(context_, ZMQ_DEALER); int rc = zmq_bind(receiver_, ("tcp://" + ip_port).c_str()); if (rc == 0) { + int linger = 0; + CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0); return 0; } else { @@ -91,6 +96,7 @@ public: if (rank == rank_) continue; std::string ip_port(endpoints[i]); senders_[rank] = zmq_socket(context_, ZMQ_DEALER); + // NOTE(feiga): set linger to 0, otherwise will hang int rc = zmq_connect(senders_[rank], ("tcp://" + ip_port).c_str()); if (rc != 0) { Log::Error("Failed to connect the socket for sender, rank = %d, " @@ -104,10 +110,22 @@ public: void Finalize() override { active_ = false; - // zmq_term(context_); - zmq_close(receiver_); - for (auto& p : senders_) if (p) zmq_close(p); - zmq_ctx_destroy(context_); + for (int i = 0; i < senders_.size(); ++i) { + if (i != rank_) { + int linger = 0; + CHECK(zmq_setsockopt(senders_[i], ZMQ_LINGER, &linger, sizeof(linger)) == 0); + int rc = zmq_close(senders_[i]); + CHECK(rc == 0); + } + } + int linger = 0; + CHECK(zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)) == 0); + int rc = zmq_close(receiver_); + CHECK(rc == 0); + + CHECK(zmq_ctx_shutdown(context_)==0); + // zmq_ctx_term(context_); + Log::Info("zmq finalize: close context\n"); } bool active() const override { return active_; } @@ -151,7 +169,7 @@ public: MessagePtr& msg = *msg_ptr; msg->data().clear(); CHECK(msg.get()); - recv_size = zmq_recv(receiver_, msg->header(), Message::kHeaderSize, 0); + recv_size = zmq_recv(receiver_, msg->header(), Message::kHeaderSize, ZMQ_DONTWAIT); if (recv_size < 0) { return -1; } CHECK(Message::kHeaderSize == recv_size); @@ -236,6 +254,7 @@ protected: std::vector senders_; int rank_; int size_; + std::vector machine_lists_; }; } // namespace multiverso diff --git a/src/communicator.cpp b/src/communicator.cpp index 753f3d3..59c122b 100644 --- a/src/communicator.cpp +++ b/src/communicator.cpp @@ -33,7 +33,7 @@ Communicator::Communicator() : Actor(actor::kCommunicator) { } Communicator::~Communicator() { - + recv_thread_->join(); } void Communicator::Main() { @@ -78,8 +78,9 @@ void Communicator::Communicate() { MessagePtr msg(new Message()); size_t size = net_util_->Recv(&msg); if (size == -1) { - Log::Debug("recv return -1\n"); - break; + continue; + // Log::Debug("recv return -1\n"); + // break; } if (size > 0) { // a message received diff --git a/src/zoo.cpp b/src/zoo.cpp index c0836d4..bf76dfc 100644 --- a/src/zoo.cpp +++ b/src/zoo.cpp @@ -50,6 +50,8 @@ void Zoo::Stop(bool finalize_net) { if (!MV_CONFIG_ma) { StopPS(); } // Stop the network if (finalize_net) net_util_->Finalize(); + for (auto actor : zoo_) delete actor.second; + Log::Info("Multiverso Shutdown successfully\n"); } int Zoo::rank() const { return NetInterface::Get()->rank(); }