merge log from multiverso-next

This commit is contained in:
Qiwei Ye 2016-04-14 12:06:41 +08:00
Родитель b156912ab6
Коммит c8e2d7e82e
97 изменённых файлов: 611 добавлений и 2599 удалений

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -1,41 +1,3 @@
multiverso
==========
Multiverso is a parameter server based framework for training machine learning models on big data with numbers of machines. It is currently a standard C++ library and provides a series of friendly programming interfaces. With such easy-to-use APIs, machine learning researchers and practitioners do not need to worry about the system routine issues such as distributed model storage and operation, inter-process and inter-thread communication, multi-threading management, and so on.
Instead, they are able to focus on the core machine learning logics: data, model, and training.
For more details, please view our website [http://www.dmtk.io](http://www.dmtk.io).
Build
----------
**Linux** (Tested on Ubuntu 12.04)
0. Run ```cd third_party```
1. Run ```./install.sh``` to install the dependence.
2. Run ```make all -j4``` to build the multiverso.
**Cmake**
0. Run ``` cd third_party ```
1. Run ``` ./install.sh ``` to install the dependence.
1. Run ``` cd .. ```
1. Run ``` mkdir build ```
2. Run ``` cd build ```
3. Run ``` cmake .. ```
4. Run ``` make ```
**Windows**
For windows users, please refer to README in windows folder.
Related Projects
----------
Current distributed systems based on multiverso:
* [lightlda](https://github.com/Microsoft/lightlda): Scalable, fast, lightweight system for large scale topic modeling
* [distributed_word_embedding](https://github.com/Microsoft/distributed_word_embedding) Distributed system for word embedding
* [distributed_skipgram_mixture](https://github.com/Microsoft/distributed_skipgram_mixture) Distributed skipgram mixture for multi-sense word embedding
# Multiverso
New version

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -1,179 +1,179 @@
#ifdef MULTIVERSO_USE_ZMQ
#include <string.h>
#include <algorithm>
#include "allreduce_engine.h"
#include "net_allreduce.h"
namespace multiverso {
AllreduceEngine::AllreduceEngine()
:block_start_(nullptr), block_len_(nullptr), buffer_(nullptr) {
}
void AllreduceEngine::Init(const AllreduceNetWrapper* linkers) {
linkers_ = linkers;
rank_ = linkers_->rank();
num_machines_ = linkers_->size();
bruck_map_ = linkers_->GetBruckMap();
recursive_halving_map_ = linkers_->GetRecursiveHalfingMap();
block_start_ = new int[num_machines_];
block_len_ = new int[num_machines_];
buffer_size_ = 1024 * 1024;
buffer_ = new char[buffer_size_];
}
AllreduceEngine::~AllreduceEngine() {
if (block_start_ != nullptr) { delete[]block_start_; }
if (block_len_ != nullptr) { delete[]block_len_; }
if (buffer_ != nullptr) { delete[] buffer_; }
}
void AllreduceEngine::Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
int count = input_size / type_size;
//if small package or small count , do it by all gather.(reduce the communication times.)
if (count < num_machines_ || input_size < 4096) {
AllreduceByAllGather(input, input_size, type_size, output, reducer);
return;
}
//assign the blocks to every rank_s.
int step = (count + num_machines_ - 1) / num_machines_;
if (step < 1) {
step = 1;
}
block_start_[0] = 0;
for (int i = 0; i < num_machines_ - 1; ++i) {
block_len_[i] = step * type_size < input_size - block_start_[i] ? step * type_size : input_size - block_start_[i];
block_start_[i + 1] = block_start_[i] + block_len_[i];
}
block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
//do reduce scatter
ReduceScatter(input, input_size, type_size, block_start_, block_len_, output, reducer);
//do all gather
Allgather(output, input_size, block_start_, block_len_, output);
}
void AllreduceEngine::AllreduceByAllGather(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
//assign blocks
int all_size = input_size * num_machines_;
block_start_[0] = 0;
block_len_[0] = input_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = input_size;
}
if (input_size*num_machines_ > buffer_size_) {
delete[] buffer_;
buffer_size_ = input_size*num_machines_;
buffer_ = new char[buffer_size_];
}
Allgather(input, all_size, block_start_, block_len_, buffer_);
for (int i = 1; i < num_machines_; ++i) {
reducer(buffer_ + block_start_[i], buffer_ + block_start_[0], input_size);
}
std::memcpy(output, buffer_, input_size);
}
void AllreduceEngine::Allgather(char* input, int send_size, char* output) {
//assign blocks
block_start_[0] = 0;
block_len_[0] = send_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = send_size;
}
Allgather(input, send_size * num_machines_, block_start_, block_len_, output);
}
void AllreduceEngine::Allgather(char* input, int all_size, int* block_start, int* block_len, char* output) {
int write_ptr = 0;
std::memcpy(output, input, block_len[rank_]);
write_ptr += block_len[rank_];
int accumulated_block = 1;
for (int i = 0; i < bruck_map_.k; ++i) {
//send
int cur_block_size = (1 << i) < num_machines_ - accumulated_block ? (1 << i) : num_machines_ - accumulated_block;
int target = bruck_map_.out_ranks[i];
int send_len = 0;
for (int j = 0; j < cur_block_size; ++j) {
send_len += block_len[(rank_ + j) % num_machines_];
}
linkers_->Send(target, output, 0, send_len);
//rec
int incoming = bruck_map_.in_ranks[i];
int need_recv_cnt = 0;
for (int j = 0; j < cur_block_size; ++j) {
need_recv_cnt += block_len[(rank_ + accumulated_block + j) % num_machines_];
}
linkers_->Receive(incoming, output, write_ptr, need_recv_cnt);
write_ptr += need_recv_cnt;
accumulated_block += cur_block_size;
}
//rotate right
std::reverse<char*>(output, output + all_size);
std::reverse<char*>(output, output + block_start[rank_]);
std::reverse<char*>(output + block_start[rank_], output + all_size);
}
void AllreduceEngine::ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer) {
bool is_powerof_2 = (num_machines_ & (num_machines_ - 1)) == 0 ? true : false;
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::SendNeighbor) {
//send local data to neighbor first
linkers_->Send(recursive_halving_map_.neighbor, input, 0, input_size);
}
else if (recursive_halving_map_.type == RecursiveHalvingNodeType::ReciveNeighbor) {
//recieve neighbor data first
int need_recv_cnt = input_size;
linkers_->Receive(recursive_halving_map_.neighbor, output, 0, need_recv_cnt);
reducer(output, input, input_size);
}
}
//start recursive halfing
if (recursive_halving_map_.type != RecursiveHalvingNodeType::SendNeighbor) {
for (int i = 0; i < recursive_halving_map_.k; ++i) {
int target = recursive_halving_map_.ranks[i];
int send_block_start = recursive_halving_map_.send_block_start[i];
int recv_block_start = recursive_halving_map_.recv_block_start[i];
//send
int send_size = 0;
for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
send_size += block_len[send_block_start + j];
}
linkers_->Send(target, input, block_start[send_block_start], send_size);
//receive
int need_recv_cnt = 0;
for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
need_recv_cnt += block_len[recv_block_start + j];
}
linkers_->Receive(target, output, 0, need_recv_cnt);
//reduce
reducer(output, input + block_start[recv_block_start], need_recv_cnt);
}
}
int my_reduce_block_idx = rank_;
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::ReciveNeighbor) {
//send result to neighbor
linkers_->Send(recursive_halving_map_.neighbor, input, block_start[recursive_halving_map_.neighbor], block_len[recursive_halving_map_.neighbor]);
}
else if (recursive_halving_map_.type == RecursiveHalvingNodeType::SendNeighbor) {
//receive result from neighbor
int need_recv_cnt = block_len[my_reduce_block_idx];
linkers_->Receive(recursive_halving_map_.neighbor, output, 0, need_recv_cnt);
return;
}
}
std::memcpy(output, input + block_start[my_reduce_block_idx], block_len[my_reduce_block_idx]);
}
}
#ifdef MULTIVERSO_USE_ZMQ
#include <string.h>
#include <algorithm>
#include "allreduce_engine.h"
#include "net_allreduce.h"
namespace multiverso {
AllreduceEngine::AllreduceEngine()
:block_start_(nullptr), block_len_(nullptr), buffer_(nullptr) {
}
void AllreduceEngine::Init(const AllreduceNetWrapper* linkers) {
linkers_ = linkers;
rank_ = linkers_->rank();
num_machines_ = linkers_->size();
bruck_map_ = linkers_->GetBruckMap();
recursive_halving_map_ = linkers_->GetRecursiveHalfingMap();
block_start_ = new int[num_machines_];
block_len_ = new int[num_machines_];
buffer_size_ = 1024 * 1024;
buffer_ = new char[buffer_size_];
}
AllreduceEngine::~AllreduceEngine() {
if (block_start_ != nullptr) { delete[]block_start_; }
if (block_len_ != nullptr) { delete[]block_len_; }
if (buffer_ != nullptr) { delete[] buffer_; }
}
void AllreduceEngine::Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
int count = input_size / type_size;
//if small package or small count , do it by all gather.(reduce the communication times.)
if (count < num_machines_ || input_size < 4096) {
AllreduceByAllGather(input, input_size, type_size, output, reducer);
return;
}
//assign the blocks to every rank_s.
int step = (count + num_machines_ - 1) / num_machines_;
if (step < 1) {
step = 1;
}
block_start_[0] = 0;
for (int i = 0; i < num_machines_ - 1; ++i) {
block_len_[i] = step * type_size < input_size - block_start_[i] ? step * type_size : input_size - block_start_[i];
block_start_[i + 1] = block_start_[i] + block_len_[i];
}
block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
//do reduce scatter
ReduceScatter(input, input_size, type_size, block_start_, block_len_, output, reducer);
//do all gather
Allgather(output, input_size, block_start_, block_len_, output);
}
void AllreduceEngine::AllreduceByAllGather(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
//assign blocks
int all_size = input_size * num_machines_;
block_start_[0] = 0;
block_len_[0] = input_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = input_size;
}
if (input_size*num_machines_ > buffer_size_) {
delete[] buffer_;
buffer_size_ = input_size*num_machines_;
buffer_ = new char[buffer_size_];
}
Allgather(input, all_size, block_start_, block_len_, buffer_);
for (int i = 1; i < num_machines_; ++i) {
reducer(buffer_ + block_start_[i], buffer_ + block_start_[0], input_size);
}
std::memcpy(output, buffer_, input_size);
}
void AllreduceEngine::Allgather(char* input, int send_size, char* output) {
//assign blocks
block_start_[0] = 0;
block_len_[0] = send_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = send_size;
}
Allgather(input, send_size * num_machines_, block_start_, block_len_, output);
}
void AllreduceEngine::Allgather(char* input, int all_size, int* block_start, int* block_len, char* output) {
int write_ptr = 0;
std::memcpy(output, input, block_len[rank_]);
write_ptr += block_len[rank_];
int accumulated_block = 1;
for (int i = 0; i < bruck_map_.k; ++i) {
//send
int cur_block_size = (1 << i) < num_machines_ - accumulated_block ? (1 << i) : num_machines_ - accumulated_block;
int target = bruck_map_.out_ranks[i];
int send_len = 0;
for (int j = 0; j < cur_block_size; ++j) {
send_len += block_len[(rank_ + j) % num_machines_];
}
linkers_->Send(target, output, 0, send_len);
//rec
int incoming = bruck_map_.in_ranks[i];
int need_recv_cnt = 0;
for (int j = 0; j < cur_block_size; ++j) {
need_recv_cnt += block_len[(rank_ + accumulated_block + j) % num_machines_];
}
linkers_->Receive(incoming, output, write_ptr, need_recv_cnt);
write_ptr += need_recv_cnt;
accumulated_block += cur_block_size;
}
//rotate right
std::reverse<char*>(output, output + all_size);
std::reverse<char*>(output, output + block_start[rank_]);
std::reverse<char*>(output + block_start[rank_], output + all_size);
}
void AllreduceEngine::ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer) {
bool is_powerof_2 = (num_machines_ & (num_machines_ - 1)) == 0 ? true : false;
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::SendNeighbor) {
//send local data to neighbor first
linkers_->Send(recursive_halving_map_.neighbor, input, 0, input_size);
}
else if (recursive_halving_map_.type == RecursiveHalvingNodeType::ReciveNeighbor) {
//recieve neighbor data first
int need_recv_cnt = input_size;
linkers_->Receive(recursive_halving_map_.neighbor, output, 0, need_recv_cnt);
reducer(output, input, input_size);
}
}
//start recursive halfing
if (recursive_halving_map_.type != RecursiveHalvingNodeType::SendNeighbor) {
for (int i = 0; i < recursive_halving_map_.k; ++i) {
int target = recursive_halving_map_.ranks[i];
int send_block_start = recursive_halving_map_.send_block_start[i];
int recv_block_start = recursive_halving_map_.recv_block_start[i];
//send
int send_size = 0;
for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
send_size += block_len[send_block_start + j];
}
linkers_->Send(target, input, block_start[send_block_start], send_size);
//receive
int need_recv_cnt = 0;
for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
need_recv_cnt += block_len[recv_block_start + j];
}
linkers_->Receive(target, output, 0, need_recv_cnt);
//reduce
reducer(output, input + block_start[recv_block_start], need_recv_cnt);
}
}
int my_reduce_block_idx = rank_;
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::ReciveNeighbor) {
//send result to neighbor
linkers_->Send(recursive_halving_map_.neighbor, input, block_start[recursive_halving_map_.neighbor], block_len[recursive_halving_map_.neighbor]);
}
else if (recursive_halving_map_.type == RecursiveHalvingNodeType::SendNeighbor) {
//receive result from neighbor
int need_recv_cnt = block_len[my_reduce_block_idx];
linkers_->Receive(recursive_halving_map_.neighbor, output, 0, need_recv_cnt);
return;
}
}
std::memcpy(output, input + block_start[my_reduce_block_idx], block_len[my_reduce_block_idx]);
}
}
#endif

Просмотреть файл

@ -1,185 +1,185 @@
#ifndef MULTIVERSO_NET_ALLREDUCE_ENGINE_H_
#define MULTIVERSO_NET_ALLREDUCE_ENGINE_H_
#ifdef MULTIVERSO_USE_ZMQ
#include <vector>
namespace multiverso {
/*! \brief forward declaration */
class AllreduceNetWrapper;
/*! \brief Reduce function */
typedef void (ReduceFunction)(const char *src, char *dst, int len);
/*! \brief The network structure for all gather */
class BruckMap {
public:
/*! \brief The communication times for one all gather operation */
int k;
/*! \brief in_ranks[i] means the incomming rank on i-th communication */
std::vector<int> in_ranks;
/*! \brief out_ranks[i] means the out rank on i-th communication */
std::vector<int> out_ranks;
BruckMap();
BruckMap(int n);
/*!
* \brief Create the object of bruck map
* \param rank rank of this machine
* \param num_machines The total number of machines
* \return The object of bruck map
*/
static BruckMap Construct(int rank, int num_machines);
};
/*!
* \brief node type on recursive halving algorithm
* When number of machines is not power of 2, need group maiches into power of 2 group.
* And we can let each group has at most 2 machines.
* if the group only has 1 machine. this machine is the normal node
* if the grou has 2 machines, this group will have two type of nodes, one is the leader.
* leader will represent this group and communication with others.
*/
enum RecursiveHalvingNodeType {
Normal, //normal node, 1 group only have 1 machine
ReciveNeighbor, //leader of group when number of machines in this group is 2.
SendNeighbor// non-leader machines in group
};
/*! \brief Network structure for recursive halving algorithm */
class RecursiveHalvingMap {
public:
/*! \brief Communication times for one recursize halving algorithm */
int k;
/*! \brief Node type */
RecursiveHalvingNodeType type;
/*! \brief Neighbor, only used for non-normal node*/
int neighbor;
/*! \brief ranks[i] means the machines that will communicate with on i-th communication*/
std::vector<int> ranks;
/*! \brief send_block_start[i] means send block start index at i-th communication*/
std::vector<int> send_block_start;
/*! \brief send_block_start[i] means send block size at i-th communication*/
std::vector<int> send_block_len;
/*! \brief send_block_start[i] means recv block start index at i-th communication*/
std::vector<int> recv_block_start;
/*! \brief send_block_start[i] means recv block size at i-th communication*/
std::vector<int> recv_block_len;
RecursiveHalvingMap();
RecursiveHalvingMap(RecursiveHalvingNodeType _type, int n);
/*!
* \brief Create the object of recursive halving map
* \param rank rank of this machine
* \param num_machines The total number of machines
* \return The object of recursive halving map
*/
static RecursiveHalvingMap Construct(int rank, int num_machines);
};
/*! \brief A class that contains some collective communication algorithm */
class AllreduceEngine {
public:
AllreduceEngine();
/*!
* \brief Initial
* \param linkers, the low-level communication methods
*/
void Init(const AllreduceNetWrapper* linkers);
~AllreduceEngine();
/*! \brief Get rank of this machine */
inline int rank();
/*! \brief Get total number of machines */
inline int num_machines();
/*!
* \brief Perform all reduce. if data size is small, will call AllreduceByAllGather, else with call ReduceScatter followed allgather
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param output Output result
* \param reducer Reduce function
*/
void Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer);
/*!
* \brief Perform all reduce, use all gather. When data is small, can use this to reduce communication times
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param output Output result
* \param reducer Reduce function
*/
void AllreduceByAllGather(char* input, int input_size, int type_size, char* output, ReduceFunction reducer);
/*!
* \brief Perform all gather, use bruck algorithm. Communication times is O(log(n)), and communication cost is O(send_size * number_machine)
* if all machine have same input size, can call this function
* \param input Input data
* \param send_size The size of input data
* \param output Output result
*/
void Allgather(char* input, int send_size, char* output);
/*!
* \brief Perform all gather, use bruck algorithm. Communication times is O(log(n)), and communication cost is O(all_size)
* if all machine have different input size, can call this function
* \param input Input data
* \param all_size The size of input data
* \param block_start The block start for different machines
* \param block_len The block size for different machines
* \param output Output result
*/
void Allgather(char* input, int all_size, int* block_start, int* block_len, char* output);
/*!
* \brief Perform reduce scatter, use recursive halving algorithm. Communication times is O(log(n)), and communication cost is O(input_size)
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param block_start The block start for different machines
* \param block_len The block size for different machines
* \param output Output result
* \param reducer Reduce function
*/
void ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer);
private:
/*! \brief Number of all machines */
int num_machines_;
/*! \brief Rank of local machine */
int rank_;
/*! \brief The network interface, provide send/recv functions */
const AllreduceNetWrapper *linkers_;
/*! \brief Bruck map for all gather algorithm*/
BruckMap bruck_map_;
/*! \brief Recursive halving map for reduce scatter */
RecursiveHalvingMap recursive_halving_map_;
/*! \brief Buffer to store block start index */
int* block_start_;
/*! \brief Buffer to store block size */
int* block_len_;
/*! \brief Buffer */
char* buffer_;
/*! \brief Size of buffer_ */
int buffer_size_;
};
inline int AllreduceEngine::rank() {
return rank_;
}
inline int AllreduceEngine::num_machines() {
return num_machines_;
}
}
#endif // MULTIVERSO_USE_ZMQ
#ifndef MULTIVERSO_NET_ALLREDUCE_ENGINE_H_
#define MULTIVERSO_NET_ALLREDUCE_ENGINE_H_
#ifdef MULTIVERSO_USE_ZMQ
#include <vector>
namespace multiverso {
/*! \brief forward declaration */
class AllreduceNetWrapper;
/*! \brief Reduce function */
typedef void (ReduceFunction)(const char *src, char *dst, int len);
/*! \brief The network structure for all gather */
class BruckMap {
public:
/*! \brief The communication times for one all gather operation */
int k;
/*! \brief in_ranks[i] means the incomming rank on i-th communication */
std::vector<int> in_ranks;
/*! \brief out_ranks[i] means the out rank on i-th communication */
std::vector<int> out_ranks;
BruckMap();
BruckMap(int n);
/*!
* \brief Create the object of bruck map
* \param rank rank of this machine
* \param num_machines The total number of machines
* \return The object of bruck map
*/
static BruckMap Construct(int rank, int num_machines);
};
/*!
* \brief node type on recursive halving algorithm
* When number of machines is not power of 2, need group maiches into power of 2 group.
* And we can let each group has at most 2 machines.
* if the group only has 1 machine. this machine is the normal node
* if the grou has 2 machines, this group will have two type of nodes, one is the leader.
* leader will represent this group and communication with others.
*/
enum RecursiveHalvingNodeType {
Normal, //normal node, 1 group only have 1 machine
ReciveNeighbor, //leader of group when number of machines in this group is 2.
SendNeighbor// non-leader machines in group
};
/*! \brief Network structure for recursive halving algorithm */
class RecursiveHalvingMap {
public:
/*! \brief Communication times for one recursize halving algorithm */
int k;
/*! \brief Node type */
RecursiveHalvingNodeType type;
/*! \brief Neighbor, only used for non-normal node*/
int neighbor;
/*! \brief ranks[i] means the machines that will communicate with on i-th communication*/
std::vector<int> ranks;
/*! \brief send_block_start[i] means send block start index at i-th communication*/
std::vector<int> send_block_start;
/*! \brief send_block_start[i] means send block size at i-th communication*/
std::vector<int> send_block_len;
/*! \brief send_block_start[i] means recv block start index at i-th communication*/
std::vector<int> recv_block_start;
/*! \brief send_block_start[i] means recv block size at i-th communication*/
std::vector<int> recv_block_len;
RecursiveHalvingMap();
RecursiveHalvingMap(RecursiveHalvingNodeType _type, int n);
/*!
* \brief Create the object of recursive halving map
* \param rank rank of this machine
* \param num_machines The total number of machines
* \return The object of recursive halving map
*/
static RecursiveHalvingMap Construct(int rank, int num_machines);
};
/*! \brief A class that contains some collective communication algorithm */
class AllreduceEngine {
public:
AllreduceEngine();
/*!
* \brief Initial
* \param linkers, the low-level communication methods
*/
void Init(const AllreduceNetWrapper* linkers);
~AllreduceEngine();
/*! \brief Get rank of this machine */
inline int rank();
/*! \brief Get total number of machines */
inline int num_machines();
/*!
* \brief Perform all reduce. if data size is small, will call AllreduceByAllGather, else with call ReduceScatter followed allgather
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param output Output result
* \param reducer Reduce function
*/
void Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer);
/*!
* \brief Perform all reduce, use all gather. When data is small, can use this to reduce communication times
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param output Output result
* \param reducer Reduce function
*/
void AllreduceByAllGather(char* input, int input_size, int type_size, char* output, ReduceFunction reducer);
/*!
* \brief Perform all gather, use bruck algorithm. Communication times is O(log(n)), and communication cost is O(send_size * number_machine)
* if all machine have same input size, can call this function
* \param input Input data
* \param send_size The size of input data
* \param output Output result
*/
void Allgather(char* input, int send_size, char* output);
/*!
* \brief Perform all gather, use bruck algorithm. Communication times is O(log(n)), and communication cost is O(all_size)
* if all machine have different input size, can call this function
* \param input Input data
* \param all_size The size of input data
* \param block_start The block start for different machines
* \param block_len The block size for different machines
* \param output Output result
*/
void Allgather(char* input, int all_size, int* block_start, int* block_len, char* output);
/*!
* \brief Perform reduce scatter, use recursive halving algorithm. Communication times is O(log(n)), and communication cost is O(input_size)
* \param input Input data
* \param input_size The size of input data
* \param type_size The size of one object in the reduce function
* \param block_start The block start for different machines
* \param block_len The block size for different machines
* \param output Output result
* \param reducer Reduce function
*/
void ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer);
private:
/*! \brief Number of all machines */
int num_machines_;
/*! \brief Rank of local machine */
int rank_;
/*! \brief The network interface, provide send/recv functions */
const AllreduceNetWrapper *linkers_;
/*! \brief Bruck map for all gather algorithm*/
BruckMap bruck_map_;
/*! \brief Recursive halving map for reduce scatter */
RecursiveHalvingMap recursive_halving_map_;
/*! \brief Buffer to store block start index */
int* block_start_;
/*! \brief Buffer to store block size */
int* block_len_;
/*! \brief Buffer */
char* buffer_;
/*! \brief Size of buffer_ */
int buffer_size_;
};
inline int AllreduceEngine::rank() {
return rank_;
}
inline int AllreduceEngine::num_machines() {
return num_machines_;
}
}
#endif // MULTIVERSO_USE_ZMQ
#endif //MULTIVERSO_NET_ALLREDUCE_ENGINE_H_

Просмотреть файл

@ -1,160 +1,160 @@
#ifdef MULTIVERSO_USE_ZMQ
#include <vector>
#include "allreduce_engine.h"
namespace multiverso {
BruckMap::BruckMap() {
k = 0;
}
BruckMap::BruckMap(int n) {
k = n;
for (int i = 0; i < n; ++i) {
in_ranks.push_back(-1);
out_ranks.push_back(-1);
}
}
BruckMap BruckMap::Construct(int rank, int num_machines) {
int* dist = new int[num_machines];
int k = 0;
for (k = 0; (1 << k) < num_machines; k++) {
dist[k] = 1 << k;
}
BruckMap bruckMap(k);
for (int j = 0; j < k; ++j) {
int ni = (rank + dist[j]) % num_machines;
bruckMap.in_ranks[j] = ni;
ni = (rank - dist[j] + num_machines) % num_machines;
bruckMap.out_ranks[j] = ni;
}
delete[] dist;
return bruckMap;
}
RecursiveHalvingMap::RecursiveHalvingMap() {
k = 0;
}
RecursiveHalvingMap::RecursiveHalvingMap(RecursiveHalvingNodeType _type, int n) {
type = _type;
if (type != RecursiveHalvingNodeType::SendNeighbor) {
k = n;
for (int i = 0; i < n; ++i) {
ranks.push_back(-1);
send_block_start.push_back(-1);
send_block_len.push_back(-1);
recv_block_start.push_back(-1);
recv_block_len.push_back(-1);
}
}
}
RecursiveHalvingMap RecursiveHalvingMap::Construct(int rank, int num_machines) {
std::vector<RecursiveHalvingMap> rec_maps;
for (int i = 0; i < num_machines; ++i) {
rec_maps.push_back(RecursiveHalvingMap());
}
int* distance = new int[num_machines];
RecursiveHalvingNodeType* node_type = new RecursiveHalvingNodeType[num_machines];
int k = 0;
for (k = 0; (1 << k) < num_machines; k++) {
distance[k] = 1 << k;
}
if ((1 << k) == num_machines) {
for (int i = 0; i < k; ++i) {
distance[i] = 1 << (k - 1 - i);
}
for (int i = 0; i < num_machines; ++i) {
rec_maps[i] = RecursiveHalvingMap(RecursiveHalvingNodeType::Normal, k);
for (int j = 0; j < k; ++j) {
int dir = ((i / distance[j]) % 2 == 0) ? 1 : -1;
int ni = i + dir * distance[j];
rec_maps[i].ranks[j] = ni;
int t = i / distance[j];
rec_maps[i].recv_block_start[j] = t * distance[j];
rec_maps[i].recv_block_len[j] = distance[j];
}
}
}
else {
k--;
int lower_power_of_2 = 1 << k;
int rest = num_machines - (1 << k);
for (int i = 0; i < num_machines; ++i) {
node_type[i] = RecursiveHalvingNodeType::Normal;
}
for (int i = 0; i < rest; ++i) {
int r = num_machines - i * 2 - 1;
int l = num_machines - i * 2 - 2;
node_type[l] = RecursiveHalvingNodeType::ReciveNeighbor;
node_type[r] = RecursiveHalvingNodeType::SendNeighbor;
}
for (int i = 0; i < k; ++i) {
distance[i] = 1 << (k - 1 - i);
}
int group_idx = 0;
int* map_len = new int[lower_power_of_2];
int* map_start = new int[lower_power_of_2];
int* group_2_node = new int[lower_power_of_2];
int* node_to_group = new int[num_machines];
for (int i = 0; i < lower_power_of_2; ++i) {
map_len[i] = 0;
}
for (int i = 0; i < num_machines; ++i) {
if (node_type[i] == RecursiveHalvingNodeType::Normal || node_type[i] == RecursiveHalvingNodeType::ReciveNeighbor) {
group_2_node[group_idx++] = i;
}
map_len[group_idx - 1]++;
node_to_group[i] = group_idx - 1;
}
map_start[0] = 0;
for (int i = 1; i < lower_power_of_2; ++i) {
map_start[i] = map_start[i - 1] + map_len[i - 1];
}
for (int i = 0; i < num_machines; ++i) {
if (node_type[i] == RecursiveHalvingNodeType::SendNeighbor) {
rec_maps[i] = RecursiveHalvingMap(RecursiveHalvingNodeType::SendNeighbor, k);
rec_maps[i].neighbor = i - 1;
continue;
}
rec_maps[i] = RecursiveHalvingMap(node_type[i], k);
if (node_type[i] == RecursiveHalvingNodeType::ReciveNeighbor) {
rec_maps[i].neighbor = i + 1;
}
for (int j = 0; j < k; ++j) {
int dir = ((node_to_group[i] / distance[j]) % 2 == 0) ? 1 : -1;
group_idx = group_2_node[(node_to_group[i] + dir * distance[j])];
rec_maps[i].ranks[j] = group_idx;
int t = node_to_group[i] / distance[j];
rec_maps[i].recv_block_start[j] = map_start[t * distance[j]];
int tl = 0;
for (int tmp_i = 0; tmp_i < distance[j]; ++tmp_i) {
tl += map_len[t * distance[j] + tmp_i];
}
rec_maps[i].recv_block_len[j] = tl;
}
}
}
for (int i = 0; i < num_machines; ++i) {
if (rec_maps[i].type != RecursiveHalvingNodeType::SendNeighbor) {
for (int j = 0; j < k; ++j) {
int target = rec_maps[i].ranks[j];
rec_maps[i].send_block_start[j] = rec_maps[target].recv_block_start[j];
rec_maps[i].send_block_len[j] = rec_maps[target].recv_block_len[j];
}
}
}
return rec_maps[rank];
}
}
#ifdef MULTIVERSO_USE_ZMQ
#include <vector>
#include "allreduce_engine.h"
namespace multiverso {
BruckMap::BruckMap() {
k = 0;
}
BruckMap::BruckMap(int n) {
k = n;
for (int i = 0; i < n; ++i) {
in_ranks.push_back(-1);
out_ranks.push_back(-1);
}
}
BruckMap BruckMap::Construct(int rank, int num_machines) {
int* dist = new int[num_machines];
int k = 0;
for (k = 0; (1 << k) < num_machines; k++) {
dist[k] = 1 << k;
}
BruckMap bruckMap(k);
for (int j = 0; j < k; ++j) {
int ni = (rank + dist[j]) % num_machines;
bruckMap.in_ranks[j] = ni;
ni = (rank - dist[j] + num_machines) % num_machines;
bruckMap.out_ranks[j] = ni;
}
delete[] dist;
return bruckMap;
}
RecursiveHalvingMap::RecursiveHalvingMap() {
k = 0;
}
RecursiveHalvingMap::RecursiveHalvingMap(RecursiveHalvingNodeType _type, int n) {
type = _type;
if (type != RecursiveHalvingNodeType::SendNeighbor) {
k = n;
for (int i = 0; i < n; ++i) {
ranks.push_back(-1);
send_block_start.push_back(-1);
send_block_len.push_back(-1);
recv_block_start.push_back(-1);
recv_block_len.push_back(-1);
}
}
}
RecursiveHalvingMap RecursiveHalvingMap::Construct(int rank, int num_machines) {
std::vector<RecursiveHalvingMap> rec_maps;
for (int i = 0; i < num_machines; ++i) {
rec_maps.push_back(RecursiveHalvingMap());
}
int* distance = new int[num_machines];
RecursiveHalvingNodeType* node_type = new RecursiveHalvingNodeType[num_machines];
int k = 0;
for (k = 0; (1 << k) < num_machines; k++) {
distance[k] = 1 << k;
}
if ((1 << k) == num_machines) {
for (int i = 0; i < k; ++i) {
distance[i] = 1 << (k - 1 - i);
}
for (int i = 0; i < num_machines; ++i) {
rec_maps[i] = RecursiveHalvingMap(RecursiveHalvingNodeType::Normal, k);
for (int j = 0; j < k; ++j) {
int dir = ((i / distance[j]) % 2 == 0) ? 1 : -1;
int ni = i + dir * distance[j];
rec_maps[i].ranks[j] = ni;
int t = i / distance[j];
rec_maps[i].recv_block_start[j] = t * distance[j];
rec_maps[i].recv_block_len[j] = distance[j];
}
}
}
else {
k--;
int lower_power_of_2 = 1 << k;
int rest = num_machines - (1 << k);
for (int i = 0; i < num_machines; ++i) {
node_type[i] = RecursiveHalvingNodeType::Normal;
}
for (int i = 0; i < rest; ++i) {
int r = num_machines - i * 2 - 1;
int l = num_machines - i * 2 - 2;
node_type[l] = RecursiveHalvingNodeType::ReciveNeighbor;
node_type[r] = RecursiveHalvingNodeType::SendNeighbor;
}
for (int i = 0; i < k; ++i) {
distance[i] = 1 << (k - 1 - i);
}
int group_idx = 0;
int* map_len = new int[lower_power_of_2];
int* map_start = new int[lower_power_of_2];
int* group_2_node = new int[lower_power_of_2];
int* node_to_group = new int[num_machines];
for (int i = 0; i < lower_power_of_2; ++i) {
map_len[i] = 0;
}
for (int i = 0; i < num_machines; ++i) {
if (node_type[i] == RecursiveHalvingNodeType::Normal || node_type[i] == RecursiveHalvingNodeType::ReciveNeighbor) {
group_2_node[group_idx++] = i;
}
map_len[group_idx - 1]++;
node_to_group[i] = group_idx - 1;
}
map_start[0] = 0;
for (int i = 1; i < lower_power_of_2; ++i) {
map_start[i] = map_start[i - 1] + map_len[i - 1];
}
for (int i = 0; i < num_machines; ++i) {
if (node_type[i] == RecursiveHalvingNodeType::SendNeighbor) {
rec_maps[i] = RecursiveHalvingMap(RecursiveHalvingNodeType::SendNeighbor, k);
rec_maps[i].neighbor = i - 1;
continue;
}
rec_maps[i] = RecursiveHalvingMap(node_type[i], k);
if (node_type[i] == RecursiveHalvingNodeType::ReciveNeighbor) {
rec_maps[i].neighbor = i + 1;
}
for (int j = 0; j < k; ++j) {
int dir = ((node_to_group[i] / distance[j]) % 2 == 0) ? 1 : -1;
group_idx = group_2_node[(node_to_group[i] + dir * distance[j])];
rec_maps[i].ranks[j] = group_idx;
int t = node_to_group[i] / distance[j];
rec_maps[i].recv_block_start[j] = map_start[t * distance[j]];
int tl = 0;
for (int tmp_i = 0; tmp_i < distance[j]; ++tmp_i) {
tl += map_len[t * distance[j] + tmp_i];
}
rec_maps[i].recv_block_len[j] = tl;
}
}
}
for (int i = 0; i < num_machines; ++i) {
if (rec_maps[i].type != RecursiveHalvingNodeType::SendNeighbor) {
for (int j = 0; j < k; ++j) {
int target = rec_maps[i].ranks[j];
rec_maps[i].send_block_start[j] = rec_maps[target].recv_block_start[j];
rec_maps[i].send_block_len[j] = rec_maps[target].recv_block_len[j];
}
}
}
return rec_maps[rank];
}
}
#endif

Просмотреть файл

Просмотреть файл

@ -1,89 +1,89 @@
#ifndef MULTIVERSO_NET_ALLREDUCE_NET_H_
#define MULTIVERSO_NET_ALLREDUCE_NET_H_
#ifdef MULTIVERSO_USE_ZMQ
#include "zmq_net.h"
#include "allreduce_engine.h"
namespace multiverso {
class AllreduceNetWrapper: public ZMQNetWrapper {
public:
void Init(int* argc, char** argv) override {
ZMQNetWrapper::Init(argc, argv);
bruck_map_ = BruckMap::Construct(rank_, size_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, size_);
allreduce_engine_.Init(this);
}
virtual int Connect(int* ranks, char* endpoints[], int size) override {
ZMQNetWrapper::Connect(ranks, endpoints, size);
bruck_map_ = BruckMap::Construct(rank_, size_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, size_);
allreduce_engine_.Init(this);
}
void Finalize() override {
ZMQNetWrapper::Finalize();
}
inline void Receive(int rank, char* data, int start, int len) const {
//note: rank is not used here
int recv_size = 0;
while (recv_size < len) {
int ret_code = zmq_recv(receiver_, data + start + recv_size, len - recv_size, 0);
if (ret_code < 0) { Log::Error("socket receive error %d", ret_code); }
recv_size += ret_code;
}
}
inline void Send(int rank, const char* data, int start, int len) const {
int send_size = 0;
while (send_size < len) {
int ret_code = zmq_send(senders_[rank], data + start + send_size, len - send_size, 0);
if (ret_code < 0) { Log::Error("socket send error %d", ret_code); }
send_size += ret_code;
}
}
int thread_level_support() override {
return NetThreadLevel::THREAD_SERIALIZED;
}
void Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
allreduce_engine_.Allreduce(input, input_size, type_size, output, reducer);
}
void Allgather(char* input, int send_size, char* output) {
allreduce_engine_.Allgather(input, send_size, output);
}
void Allgather(char* input, int all_size, int* block_start, int* block_len, char* output) {
allreduce_engine_.Allgather(input, all_size, block_start, block_len, output);
}
void ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer) {
allreduce_engine_.ReduceScatter(input, input_size, type_size, block_start, block_len, output, reducer);
}
inline const BruckMap& GetBruckMap() const {
return bruck_map_;
}
inline const RecursiveHalvingMap& GetRecursiveHalfingMap() const {
return recursive_halving_map_;
}
private:
BruckMap bruck_map_;
RecursiveHalvingMap recursive_halving_map_;
AllreduceEngine allreduce_engine_;
};
} // namespace multiverso
#endif // MULTIVERSO_USE_ZEROMQ
#ifndef MULTIVERSO_NET_ALLREDUCE_NET_H_
#define MULTIVERSO_NET_ALLREDUCE_NET_H_
#ifdef MULTIVERSO_USE_ZMQ
#include "zmq_net.h"
#include "allreduce_engine.h"
namespace multiverso {
class AllreduceNetWrapper: public ZMQNetWrapper {
public:
void Init(int* argc, char** argv) override {
ZMQNetWrapper::Init(argc, argv);
bruck_map_ = BruckMap::Construct(rank_, size_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, size_);
allreduce_engine_.Init(this);
}
virtual int Connect(int* ranks, char* endpoints[], int size) override {
ZMQNetWrapper::Connect(ranks, endpoints, size);
bruck_map_ = BruckMap::Construct(rank_, size_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, size_);
allreduce_engine_.Init(this);
}
void Finalize() override {
ZMQNetWrapper::Finalize();
}
inline void Receive(int rank, char* data, int start, int len) const {
//note: rank is not used here
int recv_size = 0;
while (recv_size < len) {
int ret_code = zmq_recv(receiver_, data + start + recv_size, len - recv_size, 0);
if (ret_code < 0) { Log::Error("socket receive error %d", ret_code); }
recv_size += ret_code;
}
}
inline void Send(int rank, const char* data, int start, int len) const {
int send_size = 0;
while (send_size < len) {
int ret_code = zmq_send(senders_[rank], data + start + send_size, len - send_size, 0);
if (ret_code < 0) { Log::Error("socket send error %d", ret_code); }
send_size += ret_code;
}
}
int thread_level_support() override {
return NetThreadLevel::THREAD_SERIALIZED;
}
void Allreduce(char* input, int input_size, int type_size, char* output, ReduceFunction reducer) {
allreduce_engine_.Allreduce(input, input_size, type_size, output, reducer);
}
void Allgather(char* input, int send_size, char* output) {
allreduce_engine_.Allgather(input, send_size, output);
}
void Allgather(char* input, int all_size, int* block_start, int* block_len, char* output) {
allreduce_engine_.Allgather(input, all_size, block_start, block_len, output);
}
void ReduceScatter(char* input, int input_size, int type_size, int* block_start, int* block_len, char* output, ReduceFunction reducer) {
allreduce_engine_.ReduceScatter(input, input_size, type_size, block_start, block_len, output, reducer);
}
inline const BruckMap& GetBruckMap() const {
return bruck_map_;
}
inline const RecursiveHalvingMap& GetRecursiveHalfingMap() const {
return recursive_halving_map_;
}
private:
BruckMap bruck_map_;
RecursiveHalvingMap recursive_halving_map_;
AllreduceEngine allreduce_engine_;
};
} // namespace multiverso
#endif // MULTIVERSO_USE_ZEROMQ
#endif // MULTIVERSO_NET_ALLREDUCE_NET_H_

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

268
next/.gitignore поставляемый
Просмотреть файл

@ -1,268 +0,0 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
[Bb]uild/
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# DNX
project.lock.json
artifacts/
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding add-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignoreable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Microsoft Azure ApplicationInsights config file
ApplicationInsights.config
# Windows Store app package directory
AppPackages/
BundleArtifacts/
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.pfx
*.publishsettings
node_modules/
orleans.codegen.cs
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
# FAKE - F# Make
.fake/
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app

Просмотреть файл

@ -1,28 +0,0 @@
sudo: required
#use only specify which branches to build
#or use except to make the blacklist
branches:
only:
- dev
#specify recipients that will be notified about build results
#default send to the committer and the author
#notifications:
# email:
# recipients:
# - v-limhua@microsoft.com
language: cpp
#use docker
services:
- docker
#build docker
install:
- docker build -t multiverso .
#run with the test.sh
script:
- docker run -t -i multiverso /etc/bootstrap.sh

Просмотреть файл

@ -1,117 +0,0 @@
# Creates hadoop-2.6.0 zeromq-4.1.3 mpich-3.0.4 jdk1.8.0_65 dmtk-ftrl on Ubuntu 14.04
#
# docker build -t lyysdy/dmtk_ftrl .
FROM ubuntu:14.04
MAINTAINER lyysdy@foxmail.com
USER root
# install dev tools
RUN apt-get update \
&& apt-get install -qqy curl tar g++-4.8 gcc \
libtool pkg-config autoconf openssh-server openssh-client rsync build-essential automake \
vim
# passwordless ssh
RUN echo "syntax enable\nset nu\nset hlsearch\nset showmatch\nset shiftwidth=4\nset mouse=a\nset tabstop=4" >> ~/.vimrc \
&& rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa \
&& ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key \
&& ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key \
&& ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa \
&& cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# Cmake
RUN curl -sc 0 https://cmake.org/files/v3.4/cmake-3.4.1-Linux-x86_64.tar.gz | tar -xz -C /usr/local \
&& cd /usr/local \
&& ln -s ./cmake-3.4.1-Linux-x86_64 cmake
ENV PATH $PATH:/usr/local/cmake/bin
# Build ZeroMQ
# Make sure that libtool, pkg-config, build-essential, autoconf and automake
# are installed.
RUN wget -qc -t 0 http://download.zeromq.org/zeromq-4.1.3.tar.gz \
&& tar -zxvf zeromq-4.1.3.tar.gz \
&& rm zeromq-4.1.3.tar.gz \
&& cd zeromq-4.1.3 \
&& ./configure --prefix=/usr/local --without-libsodium \
&& make -j4 \
&& make install -j4 \
&& cd .. \
&& rm -rf zeromq-4.1.3
# Get the C++ Wrapper zmq.hpp
RUN wget https://raw.githubusercontent.com/zeromq/cppzmq/master/zmq.hpp \
&& mv zmq.hpp /usr/local/include
#
# Get MPICH2
RUN curl -s http://www.mpich.org/static/downloads/3.0.4/mpich-3.0.4.tar.gz | tar -zx \
&& cd mpich-3.0.4 \
&& ./configure --prefix=/usr/local --disable-fc --disable-f77 \
&& make -j4 \
&& make install -j4 \
&& cd .. \
&& rm -rf mpich-3.0.4
# java
RUN mkdir -p /usr/local/java/default && \
curl -Ls 'http://download.oracle.com/otn-pub/java/jdk/8u65-b17/jdk-8u65-linux-x64.tar.gz' -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \
tar --strip-components=1 -xz -C /usr/local/java/default/
ENV JAVA_HOME /usr/local/java/default/
ENV PATH $PATH:$JAVA_HOME/bin
# hadoop
RUN wget -cq -t 0 http://www.eu.apache.org/dist/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz
RUN tar -xz -C /usr/local/ -f hadoop-2.6.0.tar.gz \
&& rm hadoop-2.6.0.tar.gz \
&& cd /usr/local && ln -s ./hadoop-2.6.0 hadoop \
&& cp -r /usr/local/hadoop/include/* /usr/local/include
ENV HADOOP_PREFIX /usr/local/hadoop
RUN sed -i '/^export JAVA_HOME/ s:.*:export JAVA_HOME=/usr/local/java/default\nexport HADOOP_PREFIX=/usr/local/hadoop\nexport HADOOP_HOME=/usr/local/hadoop\n:' $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
RUN sed -i '/^export HADOOP_CONF_DIR/ s:.*:export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop/:' $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
# fixing the libhadoop.so like a boss
RUN rm /usr/local/hadoop/lib/native/* \
&& curl -Ls http://dl.bintray.com/sequenceiq/sequenceiq-bin/hadoop-native-64-2.6.0.tar | tar -x -C /usr/local/hadoop/lib/native/
ADD docker-ubuntu/core-site.xml.template $HADOOP_PREFIX/etc/hadoop/core-site.xml.template
RUN sed s/HOSTNAME/master.dango.work/ /usr/local/hadoop/etc/hadoop/core-site.xml.template > /usr/local/hadoop/etc/hadoop/core-site.xml
ADD docker-ubuntu/ssh_config /root/.ssh/config
RUN chmod 600 /root/.ssh/config \
&& chown root:root /root/.ssh/config
# prepare boostrap.sh
ADD docker-ubuntu/bootstrap.sh /etc/bootstrap.sh
RUN chown root:root /etc/bootstrap.sh \
&& chmod 700 /etc/bootstrap.sh
ENV BOOTSTRAP /etc/bootstrap.sh
# workingaround docker.io build error
RUN ls -la /usr/local/hadoop/etc/hadoop/*-env.sh \
&& chmod +x /usr/local/hadoop/etc/hadoop/*-env.sh \
&& ls -la /usr/local/hadoop/etc/hadoop/*-env.sh
# fix the 254 error code
RUN sed -i "/^[^#]*UsePAM/ s/.*/#&/" /etc/ssh/sshd_config \
&& echo "UsePAM no" >> /etc/ssh/sshd_config \
&& echo "Port 2122" >> /etc/ssh/sshd_config
RUN apt-get update
RUN apt-get install -qqy gdb
# compile multiverso
ADD . /project
RUN mkdir -p /project/build \
&& cd /project/build \
&& cmake -DUSE_HDFS=OFF .. \
&& make
# clean unnessary packages
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*
CMD ["/etc/bootstrap.sh", "ls"]

Просмотреть файл

@ -1,3 +0,0 @@
# Multiverso
New version

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,53 +0,0 @@
#Apache Hadoop 2.6.0 Docker image
A few weeks ago we released an Apache Hadoop 2.3 Docker image (using CentOS 6.5 as the guest OS) - this quickly become the most [popular](https://registry.hub.docker.com/search?q=hadoop&s=downloads) Hadoop image in the Docker [registry](https://registry.hub.docker.com/).
Following the success of our CentOS based Hadoop 2.3 Docker [image](https://registry.hub.docker.com/u/sequenceiq/hadoop-docker/), the feedback and feature requests we received aligned with the Hadoop release cycle, so we have released an Apache Hadoop 2.6.0 Docker image on Ubuntu 14.04 as well - same as the previous version, it's available as a trusted and automated build on the official Docker [registry](https://registry.hub.docker.com/).
_FYI: All the former Hadoop releases (2.3, 2.4.0, 2.4.1, 2.5.0, 2.5.1, 2.5.2, 2.6.0) are available in the GitHub branches or our [Docker Registry](https://registry.hub.docker.com/u/sequenceiq/hadoop-ubuntu/) - check the tags._
# Build the image
If you'd like to try directly from the Dockerfile you can build the image as:
```
docker build -t sequenceiq/hadoop-ubuntu:2.6.0 .
```
# Pull the image
The image is also released as an official Docker image from Docker's automated build repository - you can always pull or refer the image when launching containers.
```
docker pull sequenceiq/hadoop-ubuntu:2.6.0
```
# Start a container
In order to use the Docker image you have just build or pulled use:
```
docker run -i -t sequenceiq/hadoop-ubuntu:2.6.0 /etc/bootstrap.sh -bash
```
## Testing
You can run one of the stock examples:
```
cd $HADOOP_PREFIX
# run the mapreduce
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep input output 'dfs[a-z.]+'
# check the output
bin/hdfs dfs -cat output/*
```
## Hadoop native libraries, build, Bintray, etc
The Hadoop build process is no easy task - requires lots of libraries and their right version, protobuf, etc and takes some time - we have simplified all these, made the build and released a 64b version of Hadoop nativelibs on this [Bintray repo](https://bintray.com/sequenceiq/sequenceiq-bin/hadoop-native-64bit/2.5.0/view/files). Enjoy.
## Automate everything
As we have mentioned previously, a Docker file was created and released in the official [Docker repository](https://registry.hub.docker.com/u/sequenceiq/hadoop-ubuntu/)

Просмотреть файл

@ -1,14 +0,0 @@
#!/bin/bash
: ${HADOOP_PREFIX:=/usr/local/hadoop}
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do echo == $cp; curl -LO $cp ; done;
export PATH=$PATH:${HADOOP_PREFIX}/bin
export CLASSPATH=/usr/local/hadoop/lib/native/*:`hadoop classpath --glob`:/usr/local/java/default/lib/*.jar
/bin/bash /project/test.sh

Просмотреть файл

@ -1,6 +0,0 @@
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://HOSTNAME:9000</value>
</property>
</configuration>

Просмотреть файл

@ -1,15 +0,0 @@
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value>0.0.0.0</value>
</property>
<property>
<name>dfs.namenode.servicerpc-bind-host</name>
<value>0.0.0.0</value>
</property>
</configuration>

Просмотреть файл

@ -1,6 +0,0 @@
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

Просмотреть файл

@ -1,5 +0,0 @@
Host *
UserKnownHostsFile /dev/null
StrictHostKeyChecking no
LogLevel quiet
Port 2122

Просмотреть файл

@ -1,32 +0,0 @@
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value>/usr/local/hadoop/etc/hadoop, /usr/local/hadoop/share/hadoop/common/*, /usr/local/hadoop/share/hadoop/common/lib/*, /usr/local/hadoop/share/hadoop/hdfs/*, /usr/local/hadoop/share/hadoop/hdfs/lib/*, /usr/local/hadoop/share/hadoop/mapreduce/*, /usr/local/hadoop/share/hadoop/mapreduce/lib/*, /usr/local/hadoop/share/hadoop/yarn/*, /usr/local/hadoop/share/hadoop/yarn/lib/*</value>
</property>
<property>
<description>
Number of seconds after an application finishes before the nodemanager's
DeletionService will delete the application's localized file directory
and log directory.
To diagnose Yarn application problems, set this property's value large
enough (for example, to 600 = 10 minutes) to permit examination of these
directories. After changing the property's value, you must restart the
nodemanager in order for it to have an effect.
The roots of Yarn applications' work directories is configurable with
the yarn.nodemanager.local-dirs property (see below), and the roots
of the Yarn applications' log directories is configurable with the
yarn.nodemanager.log-dirs property (see also below).
</description>
<name>yarn.nodemanager.delete.debug-delay-sec</name>
<value>600</value>
</property>
</configuration>

Просмотреть файл

@ -1,64 +0,0 @@
#ifndef MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_
#define MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_
#include <cmath>
#include <vector>
#include "updater.h"
namespace multiverso {
// [TODO(qiwye)]:rename the class to Shuxin Zheng's algorithms
template <typename T>
class SecondOrderUpdater : public Updater<T> {
public:
explicit SecondOrderUpdater(size_t size) :
size_(size) {
Log::Debug("[SecondOrderUpdater] Init with size = %d. \n", size_);
shadow_copies_.resize(MV_NumWorkers(), std::vector<T>(size_));
historic_g_sqr_.resize(MV_NumWorkers(), std::vector<T>(size_));
}
void Update(size_t num_element, T*data, T*delta,
UpdateOption* option, size_t offset) override {
auto g_sqr_data_ = historic_g_sqr_.at(option->worker_id());
auto copies_data_ = shadow_copies_.at(option->worker_id());
for (size_t index = 0; index < num_element; ++index) {
// gsqr = (1 - r) * g^2 + r * gsqr
g_sqr_data_[index + offset] =
(1 - option->rho()) * delta[index] * delta[index]
/ option->learning_rate() / option->learning_rate() +
option->rho() * g_sqr_data_[index + offset];
data[index + offset] -= delta[index] + option->lambda() *
std::sqrt(g_sqr_data_[index + offset]) *
(data[index + offset] - copies_data_[index + offset]);
// caching each worker's latest version of parameter
copies_data_[index + offset] = data[index + offset];
}
}
~SecondOrderUpdater() {
shadow_copies_.clear();
historic_g_sqr_.clear();
}
private:
float QuakeRsqrt(float number) {
float x = number * 0.5f, y = number;
std::uint32_t i;
std::memcpy(&i, &y, sizeof(float));
i = 0x5f3759df - (i >> 1);
std::memcpy(&y, &i, sizeof(float));
return y * (1.5f - (x * y * y));
}
protected:
std::vector< std::vector<T>> shadow_copies_;
std::vector< std::vector<T>> historic_g_sqr_;
size_t size_;
};
} // namespace multiverso
#endif // MULTIVERSO_UPDATER_SECOND_ORDER_GRADIENT_UPDATER_H_

Просмотреть файл

@ -1,4 +0,0 @@
#build docker
sudo docker build -t multiverso-next .
#run test.sh
sudo docker run -t -i multiverso-next /etc/bootstrap.sh

Просмотреть файл

@ -1,4 +0,0 @@
mpiexec /project/build/bin/multiverso.test
mpiexec /project/build/bin/multiverso.test matrix
mpiexec /project/build/bin/multiverso.test checkpoint
mpiexec /project/build/bin/multiverso.test restore

0
next/src/.gitignore → src/.gitignore поставляемый
Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл