zeromq logic simple test OK, neet further test

This commit is contained in:
feiga 2016-02-19 21:11:20 +08:00
Родитель 711a9fda11
Коммит 2e88520b6c
12 изменённых файлов: 157 добавлений и 91 удалений

Просмотреть файл

@ -72,6 +72,7 @@
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='release|x64'">
<IncludePath>$(ThirdPartyPath)/ZeroMQ 4.0.4/include;$(MSMPI_INC);$(ProjectDir)\include;$(VC_IncludePath);$(WindowsSDK_IncludePath);</IncludePath>
<LibraryPath>$(ThirdPartyPath)/ZeroMQ 4.0.4/lib;$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64);</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='debug|x64'">
<IncludePath>$(ThirdPartyPath)\ZeroMQ 4.0.4\include;$(MSMPI_INC);$(ProjectDir)\include;$(VC_IncludePath);$(WindowsSDK_IncludePath);</IncludePath>
@ -127,7 +128,7 @@
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>MULTIVERSO_USE_MPI;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -136,7 +137,7 @@
<OptimizeReferences>true</OptimizeReferences>
</Link>
<Lib>
<AdditionalDependencies>msmpi.lib</AdditionalDependencies>
<AdditionalDependencies>msmpi.lib;libzmq-v120-mt-4_0_4.lib</AdditionalDependencies>
</Lib>
<Lib>
<AdditionalLibraryDirectories>C:\Program Files (x86)\Microsoft SDKs\MPI\Lib\x64</AdditionalLibraryDirectories>
@ -150,6 +151,7 @@
<ClInclude Include="include\multiverso\message.h" />
<ClInclude Include="include\multiverso\multiverso.h" />
<ClInclude Include="include\multiverso\net.h" />
<ClInclude Include="include\multiverso\net\mpi_net.h" />
<ClInclude Include="include\multiverso\net\zmq_net.h" />
<ClInclude Include="include\multiverso\node.h" />
<ClInclude Include="include\multiverso\server.h" />
@ -169,7 +171,7 @@
<ClCompile Include="src\communicator.cpp" />
<ClCompile Include="src\controller.cpp" />
<ClCompile Include="src\multiverso.cpp" />
<ClCompile Include="src\mpi_net.cpp" />
<ClCompile Include="src\net.cpp" />
<ClCompile Include="src\node.cpp" />
<ClCompile Include="src\server.cpp" />
<ClCompile Include="src\table.cpp" />

Просмотреть файл

@ -59,7 +59,10 @@
<Filter>util</Filter>
</ClInclude>
<ClInclude Include="include\multiverso\net\zmq_net.h">
<Filter>system</Filter>
<Filter>net</Filter>
</ClInclude>
<ClInclude Include="include\multiverso\net\mpi_net.h">
<Filter>net</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
@ -75,6 +78,9 @@
<Filter Include="include\table">
<UniqueIdentifier>{eb628a56-01c9-4c87-a995-9504264c0e54}</UniqueIdentifier>
</Filter>
<Filter Include="net">
<UniqueIdentifier>{c720657f-d601-4776-956c-b2f892a1402c}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\actor.cpp">
@ -107,11 +113,11 @@
<ClCompile Include="src\node.cpp">
<Filter>system</Filter>
</ClCompile>
<ClCompile Include="src\mpi_net.cpp">
<Filter>system</Filter>
</ClCompile>
<ClCompile Include="src\util\net_util.cpp">
<Filter>util</Filter>
</ClCompile>
<ClCompile Include="src\net.cpp">
<Filter>system</Filter>
</ClCompile>
</ItemGroup>
</Project>

Просмотреть файл

@ -9,14 +9,14 @@
using namespace multiverso;
void TestKV() {
void TestKV(int argc, char* argv[]) {
Log::Info("Test KV map \n");
// ----------------------------------------------------------------------- //
// this is a demo of distributed hash table to show how to use the multiverso
// ----------------------------------------------------------------------- //
// 1. Start the Multiverso engine ---------------------------------------- //
MultiversoInit();
MultiversoInit(&argc, argv);
// 2. To create the shared table ----------------------------------------- //
@ -63,10 +63,10 @@ void TestKV() {
MultiversoShutDown();
}
void TestArray() {
void TestArray(int argc, char* argv[]) {
Log::Info("Test Array \n");
MultiversoInit();
MultiversoInit(&argc, argv);
ArrayWorker<float>* shared_array = new ArrayWorker<float>(10);
ArrayServer<float>* server_array = new ArrayServer<float>(10);
@ -99,23 +99,32 @@ void TestArray() {
}
void TestNet() {
void TestNet(int argc, char* argv[]) {
NetInterface* net = NetInterface::Get();
net->Init();
net->Init(&argc, argv);
char* hi = "hello, world";
char* hi1 = "hello, world";
char* hi2 = "hello, c++";
char* hi3 = "hello, multiverso";
if (net->rank() == 0) {
MessagePtr msg = std::make_unique<Message>();
msg->set_src(0);
msg->set_dst(1);
msg->Push(Blob(hi, 13));
msg->Push(Blob(hi1, 13));
msg->Push(Blob(hi2, 11));
msg->Push(Blob(hi3, 18));
net->Send(msg);
Log::Info("rank 0 send\n");
} else if (net->rank() == 1) {
MessagePtr msg = std::make_unique<Message>();
net->Recv(&msg);
while (net->Recv(&msg) == 0);
Log::Info("rank 1 recv\n");
CHECK(strcmp(msg->data()[0].data(), hi) == 0);
// CHECK(strcmp(msg->data()[0].data(), hi) == 0);
std::vector<Blob> recv_data = msg->data();
CHECK(recv_data.size() == 3);
for (int i = 0; i < msg->size(); ++i) {
Log::Info("%s\n", recv_data[i].data());
}
}
net->Finalize();
@ -130,13 +139,18 @@ void TestIP() {
int main(int argc, char* argv[]) {
// Log::ResetLogLevel(LogLevel::Debug);
if (argc == 2) {
if (strcmp(argv[1], "kv") == 0) TestKV();
else if (strcmp(argv[1], "array") == 0) TestArray();
else if (strcmp(argv[1], "net") == 0) TestNet();
if (strcmp(argv[1], "kv") == 0) TestKV(argc, argv);
else if (strcmp(argv[1], "array") == 0) TestArray(argc, argv);
else if (strcmp(argv[1], "net") == 0) TestNet(argc, argv);
else if (strcmp(argv[1], "ip") == 0) TestIP();
else CHECK(false);
} else if (argc == 4) {
if (strcmp(argv[3], "kv") == 0) TestKV(argc, argv);
else if (strcmp(argv[3], "array") == 0) TestArray(argc, argv);
else if (strcmp(argv[3], "net") == 0) TestNet(argc, argv);
else if (strcmp(argv[3], "ip") == 0) TestIP();
} else {
TestArray();
TestArray(argc, argv);
}
return 0;
}

Просмотреть файл

@ -11,7 +11,9 @@ enum Role {
kAll = 3
};
void MultiversoInit(int role = kAll);
void MultiversoInit(int* argc = nullptr,
char* argv[] = nullptr,
int role = kAll);
void MultiversoBarrier();

Просмотреть файл

@ -5,6 +5,8 @@
#include <string>
#include "multiverso/message.h"
#define MULTIVERSO_USE_MPI
namespace multiverso {
// Interface of inter process communication method
class NetInterface {

Просмотреть файл

@ -1,19 +1,17 @@
#ifndef MULTIVERSO_NET_MPI_NET_H_
#define MULTIVERSO_NET_MPI_NET_H_
#ifdef MULTIVERSO_USE_MPI
#include "multiverso/net.h"
#include <limits>
#include <mutex>
#include "multiverso/message.h"
// #include "multiverso/net/zmq_net.h"
#include "multiverso/util/log.h"
// TODO(feiga) remove this
#define MULTIVERSO_USE_MPI
// TODO(feiga): move to seperated files
#ifdef MULTIVERSO_USE_MPI
#include <mpi.h>
#endif
#ifdef _MSC_VER
@ -22,7 +20,6 @@
namespace multiverso {
#ifdef MULTIVERSO_USE_MPI
class MPINetWrapper : public NetInterface {
public:
MPINetWrapper() : more_(std::numeric_limits<char>::max()) {}
@ -36,14 +33,16 @@ public:
MPI_Query_thread(&thread_provided_);
if (thread_provided_ < MPI_THREAD_SERIALIZED) {
Log::Fatal("At least MPI_THREAD_SERIALIZED supported is needed by multiverso.\n");
} else if (thread_provided_ == MPI_THREAD_SERIALIZED) {
Log::Info("multiverso MPI-Net is initialized under MPI_THREAD_SERIALIZED mode.\n");
} else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
Log::Debug("multiverso MPI-Net is initialized under MPI_THREAD_MULTIPLE mode.\n");
}
}
else if (thread_provided_ == MPI_THREAD_SERIALIZED) {
Log::Info("multiverso MPI-Net is initialized under MPI_THREAD_SERIALIZED mode.\n");
}
else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
Log::Debug("multiverso MPI-Net is initialized under MPI_THREAD_MULTIPLE mode.\n");
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
MPI_Comm_size(MPI_COMM_WORLD, &size_);
Log::Debug("%s net util inited, rank = %d, size = %d\n",
Log::Debug("%s net util inited, rank = %d, size = %d\n",
name().c_str(), rank(), size());
}
@ -57,9 +56,11 @@ public:
if (thread_provided_ == MPI_THREAD_SERIALIZED) {
std::lock_guard<std::mutex> lock(mutex_);
return SendMsg(msg);
} else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
}
else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
return SendMsg(msg);
} else {
}
else {
CHECK(false);
return 0;
}
@ -76,9 +77,11 @@ public:
// block receive with lock guard
std::lock_guard<std::mutex> lock(mutex_);
return RecvMsg(msg);
} else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
}
else if (thread_provided_ == MPI_THREAD_MULTIPLE) {
return RecvMsg(msg);
} else {
}
else {
CHECK(false);
return 0;
}
@ -86,8 +89,8 @@ public:
size_t SendMsg(const MessagePtr& msg) {
size_t size = Message::kHeaderSize;
MPI_Send(msg->header(), Message::kHeaderSize, MPI_BYTE,
msg->dst(), 0, MPI_COMM_WORLD);
MPI_Send(msg->header(), Message::kHeaderSize, MPI_BYTE,
msg->dst(), 0, MPI_COMM_WORLD);
// Send multiple msg
for (auto& blob : msg->data()) {
CHECK_NOTNULL(blob.data());
@ -96,7 +99,7 @@ public:
size += blob.size();
}
// Send an extra over tag indicating the finish of this Message
MPI_Send(&more_, sizeof(char), MPI_BYTE, msg->dst(),
MPI_Send(&more_, sizeof(char), MPI_BYTE, msg->dst(),
0, MPI_COMM_WORLD);
// Log::Debug("MPI-Net: rank %d send msg size = %d\n", rank(), size+4);
return size + sizeof(char);
@ -109,14 +112,13 @@ public:
MessagePtr& msg = *msg_ptr;
msg->data().clear();
MPI_Status status;
MPI_Recv(msg->header(), Message::kHeaderSize,
MPI_BYTE, MPI_ANY_SOURCE,
0, MPI_COMM_WORLD, &status);
MPI_Recv(msg->header(), Message::kHeaderSize,
MPI_BYTE, MPI_ANY_SOURCE,
0, MPI_COMM_WORLD, &status);
size_t size = Message::kHeaderSize;
int i = 0;
int flag;
int num_probe = 0;
while (true) {
while (true) {
int count;
CHECK(MPI_SUCCESS == MPI_Probe(msg->src(), 0, MPI_COMM_WORLD, &status));
//CHECK(MPI_SUCCESS == MPI_Iprobe(msg->src(), 0, MPI_COMM_WORLD, &flag, &status));
@ -127,12 +129,12 @@ public:
MPI_Get_count(&status, MPI_BYTE, &count);
Blob blob(count);
// We only receive from msg->src() until we recv the overtag msg
MPI_Recv(blob.data(), count, MPI_BYTE, msg->src(),
0, MPI_COMM_WORLD, &status);
MPI_Recv(blob.data(), count, MPI_BYTE, msg->src(),
0, MPI_COMM_WORLD, &status);
size += count;
if (count == sizeof(char)) {
if (blob.As<char>() == more_) break;
CHECK(1+1 != 2);
CHECK(1 + 1 != 2);
}
msg->Push(blob);
// Log::Debug(" VLOG(RECV): i = %d\n", ++i);
@ -142,7 +144,7 @@ public:
}
private:
const char more_;
const char more_;
std::mutex mutex_;
int thread_provided_;
int inited_;
@ -150,20 +152,8 @@ private:
int size_;
};
#endif
NetInterface* NetInterface::Get() {
#ifdef MULTIVERSO_USE_ZMQ
Log::Fatal("Not implemented yet\n");
static ZeroMQNetWrapper net_impl;
#else
#ifdef MULTIVERSO_USE_MPI
static MPINetWrapper net_impl;
#endif
#endif
return &net_impl; // net_util.get();
}
}
#endif // MULTIVERSO_USE_MPI
#endif // MULTIVERSO_NET_MPI_NET_H_

Просмотреть файл

@ -1,35 +1,52 @@
#ifndef MULTIVERSO_NET_ZMQ_NET_H_
#define MULTIVERSO_NET_ZMQ_NET_H_
#define MULTIVERSO_USE_ZEROMQ
#ifdef MULTIVERSO_USE_ZEROMQ
#ifdef MULTIVERSO_USE_ZMQ
#include "multiverso/net.h"
#include <limits>
#include "multiverso/util/log.h"
#include "multiverso/message.h"
#include "multiverso/util/log.h"
#include "multiverso/util/net_util.h"
#include <zmq.h>
namespace multiverso {
class ZeroMQNetWrapper : public NetInterface {
class ZMQNetWrapper : public NetInterface {
public:
// argc >= 2
// argv[0]: machine file, format is same with MPI machine file
// argv[1]: port used
void Init(int* argc, char** argv) override {
// get machine file
CHECK(*argc > 2);
std::vector<std::string> machine_lists;
ParseMachineFile(argv[1], &machine_lists);
int port = atoi(argv[2]);
size_ = static_cast<int>(machine_lists.size());
std::unordered_set<std::string> local_ip;
net::GetLocalIPAddress(&local_ip);
// responder socket
context_ = zmq_ctx_new();
responder_ = zmq_socket(context_, ZMQ_REP);
CHECK(zmq_bind(responder_, "tcp://*:5555") == 0);
// get machine file
// format is same with MPI machine file
CHECK(zmq_bind(responder_,
("tcp://*:" + std::to_string(port)).c_str()) == 0);
// TODO(feiga): parse the machine list config file to get the ip
std::vector<std::string> machine_lists;
ParseMachineFile(argv[0], &machine_lists);
size_ = static_cast<int>(machine_lists.size());
for (auto ip : machine_lists) {
void* requester = zmq_socket(context_, ZMQ_REQ);
zmq_connect(requester, ("tcp://" + ip).c_str());
requester_.push_back(requester);
if (local_ip.find(ip) != local_ip.end()) {
rank_ = static_cast<int>(requester_.size());
requester_.push_back(nullptr);
} else {
void* requester = zmq_socket(context_, ZMQ_REQ);
int rc = zmq_connect(requester,
("tcp://" + ip + ":" + std::to_string(port)).c_str());
CHECK(rc == 0);
requester_.push_back(requester);
}
}
Log::Info("%s net util inited, rank = %d, size = %d\n",
@ -102,14 +119,18 @@ private:
std::vector<std::string>* result) {
CHECK_NOTNULL(result);
FILE* file;
char str[128];
char str[32];
#ifdef _MSC_VER
fopen_s(&file, filename.c_str(), "r");
#else
file = fopen(filename.c_str(), "r");
#endif
CHECK_NOTNULL(file);
#ifdef _MSC_VER
while (fscanf_s(file, "%s", &str) > 0) {
#else
while (fscanf(file, "%s", &str) > 0) {
#endif
result->push_back(str);
}
fclose(file);
@ -119,8 +140,6 @@ private:
void* context_;
void* responder_;
std::vector<void*> requester_;
const int more_;
int inited_;
int rank_;
int size_;
};

Просмотреть файл

@ -23,7 +23,7 @@ public:
static Zoo* Get() { static Zoo zoo; return &zoo; };
// Start all actors
void Start(int role);
void Start(int* argc, char* argv[], int role);
// Stop all actors
void Stop(bool finalize_net);

Просмотреть файл

@ -34,8 +34,11 @@ Communicator::Communicator() : Actor(actor::kCommunicator) {
void Communicator::Main() {
// TODO(feiga): join the thread, make sure it exit properly
//recv_thread_.reset(new std::thread(&Communicator::Communicate, this));
//Actor::Main();
#ifdef MULTIVERSO_USE_ZMQ
recv_thread_.reset(new std::thread(&Communicator::Communicate, this));
Actor::Main();
#else
#ifdef MULTIVERSO_USE_MPI
MessagePtr msg;
while (mailbox_->Alive()) {
while (mailbox_->TryPop(msg)) {
@ -44,6 +47,8 @@ void Communicator::Main() {
size_t size = net_util_->Recv(&msg);
if (size > 0) LocalForward(msg);
}
#endif
#endif
}
void Communicator::ProcessMessage(MessagePtr& msg) {

Просмотреть файл

@ -4,8 +4,8 @@
namespace multiverso {
void MultiversoInit(int role) {
Zoo::Get()->Start(role);
void MultiversoInit(int* argc, char* argv[], int role) {
Zoo::Get()->Start(argc, argv, role);
}
void MultiversoShutDown(bool finalize_net) {

26
next/src/net.cpp Normal file
Просмотреть файл

@ -0,0 +1,26 @@
#include "multiverso/net.h"
#include <limits>
#include <mutex>
#include "multiverso/message.h"
#include "multiverso/util/log.h"
#include "multiverso/net/zmq_net.h"
#include "multiverso/net/mpi_net.h"
namespace multiverso {
NetInterface* NetInterface::Get() {
#ifdef MULTIVERSO_USE_ZMQ
static ZMQNetWrapper net_impl;
return &net_impl;
#else
// #ifdef MULTIVERSO_USE_MPI
// Use MPI by default
static MPINetWrapper net_impl;
return &net_impl; // net_util.get();
// #endif
#endif
}
}

Просмотреть файл

@ -15,11 +15,11 @@ Zoo::Zoo() {}
Zoo::~Zoo() {}
void Zoo::Start(int role) {
void Zoo::Start(int* argc, char** argv, int role) {
Log::Debug("Zoo started\n");
CHECK(role >= 0 && role <= 3);
net_util_ = NetInterface::Get();
net_util_->Init();
net_util_->Init(argc, argv);
nodes_.resize(size());
nodes_[rank()].rank = rank();
nodes_[rank()].role = role;