Merge remote-tracking branch 'Microsoft/master'

This commit is contained in:
liming-vie 2016-07-14 19:26:14 +08:00
Родитель ad21249a37 6813999774
Коммит dce2dd0d89
17 изменённых файлов: 41 добавлений и 81 удалений

Просмотреть файл

@ -2,9 +2,10 @@ language: cpp
sudo: required
dist: trusty
compiler:
- clang
- gcc
# solving MPI conflict https://docs.travis-ci.com/user/languages/cpp#OpenMP-projects
before_install:
- test -n $CC && unset CC
- test -n $CXX && unset CXX
install:
- sudo apt-get install -y libopenmpi-dev openmpi-bin build-essential
@ -48,3 +49,8 @@ script:
notifications:
email: false
matrix:
include:
- compiler: gcc
- compiler: clang

Просмотреть файл

@ -31,7 +31,6 @@ aux_source_directory(${SRCDIR}/regular SRC_REGULAR)
aux_source_directory(${SRCDIR}/updater SRC_UPDATER)
aux_source_directory(${SRCDIR}/util SRC_UTIL)
set(SRC ${SRC_MODEL} ${SRC_OBJECTIVE} ${SRC_REGULAR} ${SRC_UPDATER} ${SRC_UTIL} ${MULTIVERSO_SRC}/table/array_table.cpp ${SRC_ROOT})
message(${SRC})
add_executable(LogReg ${SRC})

Просмотреть файл

@ -1,5 +1,5 @@
Logistic Regression
======
The Logoisti Regression tool is a parallelization of the logistic regression and FTRL algorithm on top of DMTK parameter server. It is a easy-to-use tool for training model on big data with numbers of machines.
The Logistic Regression tool is a parallel implementation of the logistic regression on top of multiverso. It is a easy-to-use tool for training model on big data with numbers of machines.
For more details, please refer to [wiki](https://github.com/Microsoft/multiverso/wiki/Logistic-Regression).

Просмотреть файл

@ -2,8 +2,6 @@ cmake_minimum_required(VERSION 2.8)
PROJECT(WORDEMBEDDING)
set(MULTIVERSO_DIR ~/multiverso)
find_package(MPI REQUIRED)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++11 -Wno-sign-compare -fno-omit-frame-pointer -fopenmp")
@ -20,7 +18,6 @@ set(SRCDIR ${PROJECT_SOURCE_DIR}/src)
aux_source_directory(${PROJECT_SOURCE_DIR}/src SRC_ROOT)
set(SRC ${MULTIVERSO_SRC} ${SRC_ROOT})
message(${SRC})
add_executable(WORDEMBEDDING ${SRC})

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 20 KiB

Двоичные данные
Applications/WordEmbedding/example/imges/WS 353 google vs dmtk.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 16 KiB

Просмотреть файл

@ -24,6 +24,7 @@ namespace wordembedding {
int const BlockQueue::GetQueueSize() {
int size = -1;
//This operation is safe in here and more efficient.
//std::unique_lock<std::mutex> lock(mtx_);
size = queues_.size();
//lock.unlock();

Просмотреть файл

@ -82,10 +82,12 @@ namespace wordembedding {
void DistributedWordembedding::AddDeltaWordCount() {
int64 temp_word_count = communicator_->GetWordCount();
temp_word_count = WordEmbedding_->word_count_actual - temp_word_count;
if (temp_word_count > 0) {
communicator_->AddWordCount(temp_word_count);
multiverso::Log::Info("Add word count done.word count delta is %lld\n",
temp_word_count);
}
}
void DistributedWordembedding::StartWordCount() {
multiverso::Log::Info("Rank %d Start word count thread.\n", process_id_);
@ -195,7 +197,7 @@ namespace wordembedding {
data_block = GetBlockAndPrepareParameter();
data_block_count++;
}
//if use pipeline, training this datablock and getting parameters of next
//if use pipeline, training datablock and getting parameters of next
//datablock in parallel.
else {
#pragma omp parallel num_threads(option_->thread_cnt+1)

Просмотреть файл

@ -9,12 +9,12 @@ namespace wordembedding {
output_file = nullptr;
sw_file = nullptr;
endpoints_file = "";
hs = true;
negative_num = 0;
hs = false;
negative_num = 5;
output_binary = false;
sample = 0;
cbow = true;
embeding_size = 0;
embeding_size = 100;
thread_cnt = 1;
window_size = 5;
min_count = 5;
@ -208,8 +208,10 @@ namespace wordembedding {
void InitExpTable() {
expTable = (real *)malloc((kExpTableSize + 1) * sizeof(real));
for (int i = 0; i < kExpTableSize; i++) {
expTable[i] = exp((i / (real)kExpTableSize * 2 - 1) * kMaxExp); // Precompute the exp() table
expTable[i] = expTable[i] / (expTable[i] + 1); // Precompute f(x) = x / (x + 1)
// Precompute the exp() table
expTable[i] = exp((i / (real)kExpTableSize * 2 - 1) * kMaxExp);
// Precompute f(x) = x / (x + 1)
expTable[i] = expTable[i] / (expTable[i] + 1);
}
}
}

Просмотреть файл

@ -20,8 +20,10 @@ endif(USE_HDFS)
include_directories(${PROJECT_SOURCE_DIR}/include)
set(MULTIVERSO_DIR ${PROJECT_SOURCE_DIR})
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(Test)
ADD_SUBDIRECTORY(Applications/WordEmbedding)
ADD_SUBDIRECTORY(Applications/LogisticRegression)
# TODO: more header files should be installed. Only c_api.h is installed so far

Просмотреть файл

@ -2,8 +2,7 @@
## Introduction
Multiverso is a parameter server framework for distributed machine learning.
This package can leverage multiple machines and GPUs to speed up the torch
programs.
This package can enable parallel training of torch program over multiple machines and GPUs.
## Requirements
Build multiverso successfully by following the [README > build](https://github.com/Microsoft/multiverso/blob/master/README.md#build).
@ -11,7 +10,7 @@ Build multiverso successfully by following the [README > build](https://github.c
## Installation
**NOTE**: Before installation, you need to make sure have `libmultiverso.so`
build successfully according to [Requirements](#requirements).
built successfully according to [Requirements](#requirements).
```
make install

Просмотреть файл

@ -58,7 +58,6 @@ int MV_NetBind(int rank, char* endpoint);
// \return 0 SUCCESS
// \return -1 FAIL
int MV_NetConnect(int* rank, char* endpoint[], int size);
void MV_NetClose(const char* endpoint);
void MV_NetFinalize();
} // namespace multiverso

Просмотреть файл

@ -26,8 +26,6 @@ public:
// Connect with other endpoints
virtual int Connect(int* rank, char* endpoints[], int size) = 0;
virtual void Close(const char* endpoint) = 0;
virtual bool active() const = 0;
virtual std::string name() const = 0;

Просмотреть файл

@ -139,10 +139,6 @@ public:
return -1;
}
void Close(const char*) override {
Log::Fatal("Shouldn't call this in MPI Net\n");
}
bool active() const { return inited_ != 0; }
int rank() const override { return rank_; }
int size() const override { return size_; }

Просмотреть файл

@ -22,15 +22,11 @@ MV_DEFINE_int(port, 55555 , "port used to communication");
class ZMQNetWrapper : public NetInterface {
public:
// argc >= 2
// argv[1]: machine file, format is same with MPI machine file
// argv[2]: port used
void Init(int* argc, char** argv) override {
// get machine file
if (active_) return;
// CHECK(*argc > 2);
ParseMachineFile(MV_CONFIG_machine_file, &machine_lists_);
int port = MV_CONFIG_port; // atoi(argv[2]);
int port = MV_CONFIG_port;
size_ = static_cast<int>(machine_lists_.size());
CHECK(size_ > 0);
@ -48,20 +44,14 @@ public:
receiver_.socket = zmq_socket(context_, ZMQ_DEALER);
receiver_.endpoint = ip + ":" + std::to_string(port);
int rc = zmq_bind(receiver_.socket, ("tcp://" + receiver_.endpoint).c_str());
endpoint_to_socket_[receiver_.endpoint] = receiver_.socket;
CHECK(rc == 0);
int linger = 0;
// CHECK(zmq_setsockopt(receiver_.socket, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
} else {
Entity sender;
sender.socket = zmq_socket(context_, ZMQ_DEALER);
sender.endpoint = ip + ":" + std::to_string(port);
int rc = zmq_connect(sender.socket, ("tcp://" + sender.endpoint).c_str());
endpoint_to_socket_[sender.endpoint] = sender.socket;
CHECK(rc == 0);
senders_.push_back(sender);
int linger = 0;
// CHECK(zmq_setsockopt(sender.socket, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
}
}
CHECK_NOTNULL(receiver_.socket);
@ -73,15 +63,16 @@ public:
virtual int Bind(int rank, char* endpoint) override {
rank_ = rank;
std::string ip_port(endpoint);
if (context_ == nullptr) { context_ = zmq_ctx_new(); }
if (context_ == nullptr) {
context_ = zmq_ctx_new();
}
CHECK_NOTNULL(context_);
if (receiver_.socket == nullptr)
receiver_.socket = zmq_socket(context_, ZMQ_DEALER);
receiver_.endpoint = ip_port;
int rc = zmq_bind(receiver_.socket, ("tcp://" + receiver_.endpoint).c_str());
endpoint_to_socket_[receiver_.endpoint] = receiver_.socket;
if (rc == 0) {
int linger = 0;
// CHECK(zmq_setsockopt(receiver_.socket, ZMQ_LINGER, &linger, sizeof(linger)) == 0);
return 0;
}
else {
@ -100,15 +91,12 @@ public:
for (int i = 0; i < size; ++i) {
int rank = ranks[i];
std::string ip_port(endpoints[i]);
// if (rank == rank_) continue;
if (ip_port == receiver_.endpoint) {
rank_ = rank;
continue;
}
senders_[rank].socket = zmq_socket(context_, ZMQ_DEALER);
senders_[rank].endpoint = ip_port;
endpoint_to_socket_[senders_[rank].endpoint] = senders_[rank].socket;
// NOTE(feiga): set linger to 0, otherwise will hang
int rc = zmq_connect(senders_[rank].socket, ("tcp://" + senders_[rank].endpoint).c_str());
if (rc != 0) {
Log::Error("Failed to connect the socket for sender, rank = %d, "
@ -120,32 +108,7 @@ public:
return 0;
}
void Close(const char* endpoint) override {
std::string str_endpoint(endpoint);
auto it = endpoint_to_socket_.find(str_endpoint);
if (it != endpoint_to_socket_.end()) {
Log::Info("Close endpoint %s\n", it->first.c_str());
CHECK_NOTNULL(it->second);
CHECK(zmq_close(it->second) == 0);
endpoint_to_socket_.erase(it);
if (endpoint_to_socket_.empty()) {
// Term the context when the last endpoint is closed properly
zmq_ctx_term(context_);
Log::Info("ZMQ Finalize sucessfully\n");
}
}
}
void Finalize() override {
//active_ = false;
//for (auto entity : senders_) {
// if (entity.socket != nullptr) {
// zmq_disconnect(entity.socket, entity.endpoint.c_str());
// Close(entity.endpoint.c_str());
// }
//}
//zmq_unbind(receiver_.socket, receiver_.endpoint.c_str());
// Close(receiver_.endpoint.c_str());
for (int i = 0; i < senders_.size(); ++i) {
if (i != rank_) {
int linger = 0;
@ -164,7 +127,9 @@ public:
Log::Info("zmq finalize: before close context\n");
CHECK(zmq_ctx_shutdown(context_)==0);
CHECK_NOTNULL(context_);
zmq_ctx_term(context_);
context_ = nullptr;
Log::Info("zmq finalize: close context\n");
}
@ -299,13 +264,9 @@ protected:
Entity receiver_;
std::vector<Entity> senders_;
// void* receiver_;
// std::vector<void*> senders_;
int rank_;
int size_;
std::vector<std::string> machine_lists_;
std::unordered_map<std::string, void*> endpoint_to_socket_;
};
} // namespace multiverso

Просмотреть файл

@ -57,10 +57,6 @@ int MV_NetConnect(int* ranks, char* endpoints[], int size) {
return NetInterface::Get()->Connect(ranks, endpoints, size);
}
void MV_NetClose(const char* endpoint) {
NetInterface::Get()->Close(endpoint);
}
void MV_NetFinalize() {
NetInterface::Get()->Finalize();
}

Просмотреть файл

@ -32,6 +32,8 @@ int ParsePSRole(const std::string& ps_role) {
if (ps_role == "default") return Role::ALL;
return -1;
}
const int kController = 0;
} // namespace
void Zoo::Start(int* argc, char** argv) {
@ -75,7 +77,7 @@ void Zoo::StartPS() {
mailbox_.reset(new MtQueue<MessagePtr>);
// NOTE(feiga): the start order is non-trivial, communicator should be last.
if (rank() == 0) { Actor* controler = new Controller(); controler->Start(); }
if (rank() == kController) { Actor* controler = new Controller(); controler->Start(); }
Actor* communicator = new Communicator();
communicator->Start();
// activate the system
@ -110,7 +112,7 @@ void Zoo::StopPS() {
void Zoo::RegisterNode() {
MessagePtr msg(new Message());
msg->set_src(rank());
msg->set_dst(0);
msg->set_dst(kController);
msg->set_type(MsgType::Control_Register);
msg->Push(Blob(&nodes_[rank()], sizeof(Node)));
SendTo(actor::kCommunicator, msg);
@ -158,7 +160,7 @@ void Zoo::FinishTrain() {
void Zoo::Barrier() {
MessagePtr msg(new Message());
msg->set_src(rank());
msg->set_dst(0); // rank 0 acts as the controller master.
msg->set_dst(kController);
msg->set_type(MsgType::Control_Barrier);
SendTo(actor::kCommunicator, msg);