adding option for zmq build
This commit is contained in:
Родитель
575bd37755
Коммит
8aebc8c2cc
|
@ -4,10 +4,12 @@ PROJECT(MULTIVERSO)
|
|||
|
||||
OPTION(USE_HDFS "won't use hdfs on default, set ON to enable" OFF)
|
||||
OPTION(TEST "Build all tests." ON)
|
||||
OPTION(USE_ZMQ "weather to build with ZeroMQ.(default: OFF)" OFF)
|
||||
|
||||
find_package(MPI REQUIRED)
|
||||
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++11")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
|
||||
|
||||
if(USE_HDFS)
|
||||
ADD_DEFINITIONS(-DMULTIVERSO_USE_HDFS)
|
||||
|
|
|
@ -34,10 +34,10 @@ public:
|
|||
// virtual void Allreduce(void* data, size_t count, int type, int type_size);
|
||||
|
||||
// \return 1. > 0 sended size 2. = 0 not sended 3. < 0 net error
|
||||
virtual int Send(MessagePtr& msg) = 0;
|
||||
virtual size_t Send(MessagePtr& msg) = 0;
|
||||
|
||||
// \return 1. > 0 received size 2. = 0 not received 3. < 0 net error
|
||||
virtual int Recv(MessagePtr* msg) = 0;
|
||||
virtual size_t Recv(MessagePtr* msg) = 0;
|
||||
|
||||
// Blocking, send raw data to rank
|
||||
virtual void SendTo(int rank, char* buf, int len) const = 0;
|
||||
|
|
|
@ -192,7 +192,7 @@ public:
|
|||
// return size;
|
||||
//}
|
||||
|
||||
int Send(MessagePtr& msg) override {
|
||||
size_t Send(MessagePtr& msg) override {
|
||||
if (msg.get()) { send_queue_.Push(msg); }
|
||||
|
||||
if (last_handle_.get() != nullptr && !last_handle_->Test()) {
|
||||
|
@ -227,7 +227,7 @@ public:
|
|||
// return RecvMsgFrom(status.MPI_SOURCE, msg);
|
||||
//}
|
||||
|
||||
int Recv(MessagePtr* msg) override {
|
||||
size_t Recv(MessagePtr* msg) override {
|
||||
MPI_Status status;
|
||||
int flag;
|
||||
// non-blocking probe whether message comes
|
||||
|
@ -286,11 +286,11 @@ public:
|
|||
MV_MPI_CALL(MPI_Wait(&send_request, &status));
|
||||
}
|
||||
|
||||
int SerializeAndSend(MessagePtr& msg, MPIMsgHandle* msg_handle) {
|
||||
size_t SerializeAndSend(MessagePtr& msg, MPIMsgHandle* msg_handle) {
|
||||
|
||||
CHECK_NOTNULL(msg_handle);
|
||||
MONITOR_BEGIN(MPI_NET_SEND_SERIALIZE);
|
||||
int size = sizeof(size_t) + Message::kHeaderSize;
|
||||
size_t size = sizeof(size_t) + Message::kHeaderSize;
|
||||
for (auto& data : msg->data()) size += sizeof(size_t) + data.size();
|
||||
if (size > send_size_) {
|
||||
send_buffer_ = (char*)realloc(send_buffer_, size);
|
||||
|
@ -315,7 +315,7 @@ public:
|
|||
return size;
|
||||
}
|
||||
|
||||
int RecvAndDeserialize(int src, int count, MessagePtr* msg_ptr) {
|
||||
size_t RecvAndDeserialize(int src, int count, MessagePtr* msg_ptr) {
|
||||
if (!msg_ptr->get()) msg_ptr->reset(new Message());
|
||||
MessagePtr& msg = *msg_ptr;
|
||||
msg->data().clear();
|
||||
|
|
|
@ -88,7 +88,7 @@ public:
|
|||
CHECK_NOTNULL(context_);
|
||||
size_ = size;
|
||||
senders_.resize(size_);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
for (auto i = 0; i < size; ++i) {
|
||||
int rank = ranks[i];
|
||||
std::string ip_port(endpoints[i]);
|
||||
if (ip_port == receiver_.endpoint) {
|
||||
|
@ -109,7 +109,7 @@ public:
|
|||
}
|
||||
|
||||
void Finalize() override {
|
||||
for (int i = 0; i < senders_.size(); ++i) {
|
||||
for (auto i = 0; i < senders_.size(); ++i) {
|
||||
if (i != rank_) {
|
||||
int linger = 0;
|
||||
CHECK(zmq_setsockopt(senders_[i].socket, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
|
||||
|
@ -272,4 +272,4 @@ protected:
|
|||
|
||||
#endif // MULTIVERSO_USE_ZEROMQ
|
||||
|
||||
#endif // MULTIVERSO_NET_ZMQ_NET_H_
|
||||
#endif // MULTIVERSO_NET_ZMQ_NET_H_
|
||||
|
|
|
@ -1,18 +1,29 @@
|
|||
include_directories(${MPI_CXX_INCLUDE_PATH})
|
||||
|
||||
if (NOT USE_ZMQ)
|
||||
ADD_DEFINITIONS(-DMULTIVERSO_USE_MPI)
|
||||
else()
|
||||
ADD_DEFINITIONS(-DMULTIVERSO_USE_ZMQ)
|
||||
endif()
|
||||
|
||||
find_package(OpenMP)
|
||||
if (OPENMP_FOUND)
|
||||
message("OpenMP found")
|
||||
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
|
||||
if (NOT USE_ZMQ)
|
||||
find_package(OpenMP)
|
||||
if (OPENMP_FOUND)
|
||||
message("OpenMP found")
|
||||
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
|
||||
endif()
|
||||
endif()
|
||||
|
||||
set(MULTIVERSO_SRC actor.cpp communicator.cpp controller.cpp dashboard.cpp multiverso.cpp net.cpp node.cpp server.cpp table.cpp table/array_table.cpp table/matrix_table.cpp table/sparse_matrix_table.cpp table/matrix.cpp timer.cpp updater/updater.cpp util/configure.cpp io/hdfs_stream.cpp io/io.cpp io/local_stream.cpp util/log.cpp util/net_util.cpp util/quantization_util.cpp worker.cpp zoo.cpp c_api.cpp util/allocator.cpp table_factory.cpp blob.cpp)
|
||||
|
||||
add_library(multiverso SHARED ${MULTIVERSO_SRC})
|
||||
target_link_libraries(multiverso ${MPI_LIBRARY})
|
||||
add_library(imultiverso ${MULTIVERSO_SRC})
|
||||
if (NOT USE_ZMQ)
|
||||
target_link_libraries(multiverso ${MPI_LIBRARY})
|
||||
else()
|
||||
target_link_libraries(multiverso zmq)
|
||||
endif()
|
||||
|
||||
install (TARGETS multiverso DESTINATION lib)
|
||||
if (UNIX)
|
||||
|
|
Загрузка…
Ссылка в новой задаче