use dataset_loader to load data

This commit is contained in:
Guolin Ke 2016-11-04 22:38:07 +08:00
Родитель 8696709e91
Коммит 1c08e71e50
11 изменённых файлов: 898 добавлений и 1067 удалений

Просмотреть файл

@ -8,6 +8,7 @@
namespace LightGBM {
class DatasetLoader;
class Dataset;
class Boosting;
class ObjectiveFunction;
@ -59,6 +60,8 @@ private:
/*! \brief All configs */
OverallConfig config_;
/*! \brief Dataset loader */
DatasetLoader* dataset_loader_;
/*! \brief Training data */
Dataset* train_data_;
/*! \brief Validation data */

Просмотреть файл

@ -93,7 +93,6 @@ public:
std::string output_model = "LightGBM_model.txt";
std::string output_result = "LightGBM_predict_result.txt";
std::string input_model = "";
std::string input_init_score = "";
int verbosity = 1;
int num_model_predict = -1;
bool is_pre_partition = false;
@ -318,7 +317,6 @@ struct ParameterAlias {
{ "model_out", "output_model" },
{ "model_input", "input_model" },
{ "model_in", "input_model" },
{ "init_score", "input_init_score"},
{ "predict_result", "output_result" },
{ "prediction_result", "output_result" },
{ "valid", "valid_data" },

Просмотреть файл

@ -1,5 +1,5 @@
#ifndef LIGHTGBM_DATA_H_
#define LIGHTGBM_DATA_H_
#ifndef LIGHTGBM_DATASET_H_
#define LIGHTGBM_DATASET_H_
#include <LightGBM/utils/random.h>
#include <LightGBM/utils/text_reader.h>
@ -18,6 +18,7 @@ namespace LightGBM {
/*! \brief forward declaration */
class Feature;
class BinMapper;
class DatasetLoader;
/*!
* \brief This class is used to store some meta(non-feature) data for training data,
@ -44,13 +45,7 @@ public:
* \param init_score_filename Filename of initial score
* \param num_class Number of classes
*/
void Init(const char* data_filename, const char* init_score_filename, const int num_class);
/*!
* \brief Initialize, only load initial score
* \param init_score_filename Filename of initial score
* \param num_class Number of classes
*/
void Init(const char* init_score_filename, const int num_class);
void Init(const char* data_filename, const int num_class);
/*!
* \brief Initial with binary memory
* \param memory Pointer to memory
@ -178,10 +173,9 @@ public:
*/
inline const float* init_score() const { return init_score_; }
private:
/*! \brief Load initial scores from file */
void LoadInitialScore();
private:
/*! \brief Load wights from file */
void LoadWeights();
/*! \brief Load query boundaries from file */
@ -190,8 +184,6 @@ private:
void LoadQueryWeights();
/*! \brief Filename of current data */
const char* data_filename_;
/*! \brief Filename of initial scores */
const char* init_score_filename_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Number of classes */
@ -251,79 +243,16 @@ using PredictFunction =
*/
class Dataset {
public:
/*!
* \brief Constructor
* \param data_filename Filename of dataset
* \param init_score_filename Filename of initial score
* \param io_config configs for IO
* \param predict_fun Used for initial model, will give a prediction score based on this function, then set as initial score
*/
Dataset(const char* data_filename, const char* init_score_filename,
const IOConfig& io_config, const PredictFunction& predict_fun);
friend DatasetLoader;
/*!
* \brief Constructor
* \param data_filename Filename of dataset
* \param io_config configs for IO
* \param predict_fun Used for initial model, will give a prediction score based on this function, then set as initial score
*/
Dataset(const char* data_filename,
const IOConfig& io_config, const PredictFunction& predict_fun)
: Dataset(data_filename, "", io_config, predict_fun) {
}
/*!
* \brief Constructor, without filename, used to load data from memory
* \param io_config configs for IO
* \param predict_fun Used for initial model, will give a prediction score based on this function, then set as initial score
*/
Dataset(const IOConfig& io_config, const PredictFunction& predict_fun);
Dataset();
/*! \brief Destructor */
~Dataset();
/*! \brief Init Dataset with specific binmapper */
void InitByBinMapper(std::vector<const BinMapper*> bin_mappers, data_size_t num_data);
/*! \brief push raw data into dataset */
void PushData(const std::vector<std::vector<std::pair<int, float>>>& datas, data_size_t start_idx, bool is_finished);
void SetField(const char* field_name, const void* field_data, data_size_t num_element, int type);
/*!
* \brief Load training data on parallel training
* \param rank Rank of local machine
* \param num_machines Total number of all machines
* \param is_pre_partition True if data file is pre-partitioned
* \param use_two_round_loading True if need to use two round loading
*/
void LoadTrainData(int rank, int num_machines, bool is_pre_partition,
bool use_two_round_loading);
/*!
* \brief Load training data on single machine training
* \param use_two_round_loading True if need to use two round loading
*/
inline void LoadTrainData(bool use_two_round_loading) {
LoadTrainData(0, 1, false, use_two_round_loading);
}
/*!
* \brief Load data and use bin mapper from other data set, general this function is used to extract feature for validation data
* \param train_set Other loaded data set
* \param use_two_round_loading True if need to use two round loading
*/
void LoadValidationData(const Dataset* train_set, bool use_two_round_loading);
/*!
* \brief Load data set from binary file
* \param bin_filename filename of bin data
* \param rank Rank of local machine
* \param num_machines Total number of all machines
* \param is_pre_partition True if data file is pre-partitioned
*/
void LoadDataFromBinFile(const char* bin_filename, int rank, int num_machines, bool is_pre_partition);
/*!
* \brief Save current dataset into binary file, will save to "filename.bin"
*/
@ -331,6 +260,8 @@ public:
std::vector<const BinMapper*> GetBinMappers() const;
void CopyFeatureMetadataTo(Dataset *dataset, bool is_enable_sparse) const;
/*!
* \brief Get a feature pointer for specific index
* \param i Index for feature
@ -365,57 +296,7 @@ public:
Dataset(const Dataset&) = delete;
private:
/*!
* \brief Load data content on memory. if num_machines > 1 and !is_pre_partition, will partition data
* \param rank Rank of local machine
* \param num_machines Total number of all machines
* \param is_pre_partition True if data file is pre-partitioned
*/
void LoadDataToMemory(int rank, int num_machines, bool is_pre_partition);
/*!
* \brief Sample data from memory, need load data to memory first
* \param out_data Store the sampled data
*/
void SampleDataFromMemory(std::vector<std::string>* out_data);
/*!
* \brief Sample data from file
* \param rank Rank of local machine
* \param num_machines Total number of all machines
* \param is_pre_partition True if data file is pre-partitioned
* \param out_data Store the sampled data
*/
void SampleDataFromFile(int rank, int num_machines,
bool is_pre_partition, std::vector<std::string>* out_data);
/*!
* \brief Get feature bin mapper from sampled data.
* if num_machines > 1, differnt machines will construct bin mapper for different features, then have a global sync up
* \param rank Rank of local machine
* \param num_machines Total number of all machines
*/
void ConstructBinMappers(int rank, int num_machines,
const std::vector<std::string>& sample_data);
/*! \brief Extract local features from memory */
void ExtractFeaturesFromMemory();
/*! \brief Extract local features from file */
void ExtractFeaturesFromFile();
/*! \brief Check can load from binary file */
void CheckCanLoadFromBin();
/*! \brief Check this data set is null or not */
void CheckDataset();
/*! \brief Filename of data */
const char* data_filename_;
/*! \brief A reader class that can read text data */
TextReader<data_size_t>* text_reader_;
/*! \brief A parser class that can parse data */
Parser* parser_;
/*! \brief Store used features */
std::vector<Feature*> features_;
/*! \brief Mapper from real feature index to used index*/
@ -430,32 +311,12 @@ private:
int num_class_;
/*! \brief Store some label level data*/
Metadata metadata_;
/*! \brief Random generator*/
Random random_;
/*! \brief The maximal number of bin that feature values will bucket in */
int max_bin_;
/*! \brief True if enable sparse */
bool is_enable_sparse_;
/*! \brief True if dataset is loaded from binary file */
bool is_loading_from_binfile_;
/*! \brief Number of global data, used for distributed learning */
size_t global_num_data_ = 0;
/*! \brief used to local used data indices */
std::vector<data_size_t> used_data_indices_;
/*! \brief prediction function for initial model */
const PredictFunction& predict_fun_;
/*! \brief index of label column */
int label_idx_ = 0;
/*! \brief index of weight column */
int weight_idx_ = -1;
/*! \brief index of group column */
int group_idx_ = -1;
/*! \brief Mapper from real feature index to used index*/
std::unordered_set<int> ignore_features_;
/*! \brief store feature names */
std::vector<std::string> feature_names_;
/*! \brief store feature names */
int bin_construct_sample_cnt_;
};
} // namespace LightGBM

Просмотреть файл

@ -5,6 +5,7 @@
#include <LightGBM/network.h>
#include <LightGBM/dataset.h>
#include <LightGBM/dataset_loader.h>
#include <LightGBM/boosting.h>
#include <LightGBM/objective_function.h>
#include <LightGBM/metric.h>
@ -26,7 +27,7 @@
namespace LightGBM {
Application::Application(int argc, char** argv)
:train_data_(nullptr), boosting_(nullptr), objective_fun_(nullptr) {
:dataset_loader_(nullptr), train_data_(nullptr), boosting_(nullptr), objective_fun_(nullptr) {
LoadParameters(argc, argv);
// set number of threads for openmp
if (config_.num_threads > 0) {
@ -35,6 +36,7 @@ Application::Application(int argc, char** argv)
}
Application::~Application() {
if (dataset_loader_ != nullptr) { delete dataset_loader_; }
if (train_data_ != nullptr) { delete train_data_; }
for (auto& data : valid_datas_) {
if (data != nullptr) { delete data; }
@ -141,19 +143,17 @@ void Application::LoadData() {
config_.io_config.data_random_seed =
GlobalSyncUpByMin<int>(config_.io_config.data_random_seed);
}
train_data_ = new Dataset(config_.io_config.data_filename.c_str(),
config_.io_config.input_init_score.c_str(),
config_.io_config,
predict_fun);
dataset_loader_ = new DatasetLoader(config_.io_config, predict_fun);
dataset_loader_->SetHeadder(config_.io_config.data_filename.c_str());
// load Training data
if (config_.is_parallel_find_bin) {
// load data for parallel training
train_data_->LoadTrainData(Network::rank(), Network::num_machines(),
config_.io_config.is_pre_partition,
config_.io_config.use_two_round_loading);
train_data_ = dataset_loader_->LoadFromFile(config_.io_config.data_filename.c_str(),
Network::rank(), Network::num_machines());
} else {
// load data for single machine
train_data_->LoadTrainData(config_.io_config.use_two_round_loading);
train_data_ = dataset_loader_->LoadFromFile(config_.io_config.data_filename.c_str(), 0, 1);
}
// need save binary file
if (config_.io_config.is_save_binary_file) {
@ -173,13 +173,8 @@ void Application::LoadData() {
// Add validation data, if it exists
for (size_t i = 0; i < config_.io_config.valid_data_filenames.size(); ++i) {
// add
valid_datas_.push_back(
new Dataset(config_.io_config.valid_data_filenames[i].c_str(),
config_.io_config,
predict_fun));
// load validation data like train data
valid_datas_.back()->LoadValidationData(train_data_,
config_.io_config.use_two_round_loading);
valid_datas_.push_back(dataset_loader_->LoadFromFileLikeOthers(config_.io_config.valid_data_filenames[i].c_str(),
train_data_));
// need save binary file
if (config_.io_config.is_save_binary_file) {
valid_datas_.back()->SaveBinaryFile(nullptr);

Просмотреть файл

@ -1,4 +1,3 @@
#include <LightGBM/c_api.h>
#include <LightGBM/dataset.h>
#include <LightGBM/boosting.h>
@ -10,6 +9,7 @@
#include <vector>
#include <string>
#include <cstring>
#include <memory>
namespace LightGBM {
@ -100,3 +100,6 @@ private:
};
}
using namespace LightGBM;

Просмотреть файл

@ -202,7 +202,6 @@ void IOConfig::Set(const std::unordered_map<std::string, std::string>& params) {
GetString(params, "output_model", &output_model);
GetString(params, "input_model", &input_model);
GetString(params, "output_result", &output_result);
GetString(params, "input_init_score", &input_init_score);
std::string tmp_str = "";
if (GetString(params, "valid_data", &tmp_str)) {
valid_data_filenames = Common::Split(tmp_str.c_str(), ',');

Просмотреть файл

@ -15,295 +15,32 @@
namespace LightGBM {
Dataset::Dataset(const char* data_filename, const char* init_score_filename,
const IOConfig& io_config, const PredictFunction& predict_fun)
:data_filename_(data_filename), random_(io_config.data_random_seed),
max_bin_(io_config.max_bin), is_enable_sparse_(io_config.is_enable_sparse),
predict_fun_(predict_fun), bin_construct_sample_cnt_(io_config.bin_construct_sample_cnt) {
num_class_ = io_config.num_class;
if (io_config.enable_load_from_binary_file) {
CheckCanLoadFromBin();
}
if (is_loading_from_binfile_ && predict_fun != nullptr) {
Log::Info("Cannot initialize prediction by using a binary file, using text file instead");
Dataset::Dataset() {
num_class_ = 1;
num_data_ = 0;
is_loading_from_binfile_ = false;
}
if (!is_loading_from_binfile_) {
// load weight, query information and initialize score
metadata_.Init(data_filename, init_score_filename, num_class_);
// create text reader
text_reader_ = new TextReader<data_size_t>(data_filename, io_config.has_header);
std::unordered_map<std::string, int> name2idx;
// get column names
if (io_config.has_header) {
std::string first_line = text_reader_->first_line();
feature_names_ = Common::Split(first_line.c_str(), "\t ,");
for (size_t i = 0; i < feature_names_.size(); ++i) {
name2idx[feature_names_[i]] = static_cast<int>(i);
}
}
std::string name_prefix("name:");
// load label idx
if (io_config.label_column.size() > 0) {
if (Common::StartsWith(io_config.label_column, name_prefix)) {
std::string name = io_config.label_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
label_idx_ = name2idx[name];
Log::Info("Using column %s as label", name.c_str());
} else {
Log::Fatal("Could not find label column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config.label_column.c_str(), &label_idx_)) {
Log::Fatal("label_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as label", label_idx_);
}
}
if (feature_names_.size() > 0) {
// erase label column name
feature_names_.erase(feature_names_.begin() + label_idx_);
}
// load ignore columns
if (io_config.ignore_column.size() > 0) {
if (Common::StartsWith(io_config.ignore_column, name_prefix)) {
std::string names = io_config.ignore_column.substr(name_prefix.size());
for (auto name : Common::Split(names.c_str(), ',')) {
if (name2idx.count(name) > 0) {
int tmp = name2idx[name];
// skip for label column
if (tmp > label_idx_) { tmp -= 1; }
ignore_features_.emplace(tmp);
} else {
Log::Fatal("Could not find ignore column %s in data file", name.c_str());
}
}
} else {
for (auto token : Common::Split(io_config.ignore_column.c_str(), ',')) {
int tmp = 0;
if (!Common::AtoiAndCheck(token.c_str(), &tmp)) {
Log::Fatal("ignore_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
// skip for label column
if (tmp > label_idx_) { tmp -= 1; }
ignore_features_.emplace(tmp);
}
}
}
// load weight idx
if (io_config.weight_column.size() > 0) {
if (Common::StartsWith(io_config.weight_column, name_prefix)) {
std::string name = io_config.weight_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
weight_idx_ = name2idx[name];
Log::Info("Using column %s as weight", name.c_str());
} else {
Log::Fatal("Could not find weight column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config.weight_column.c_str(), &weight_idx_)) {
Log::Fatal("weight_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as weight", weight_idx_);
}
// skip for label column
if (weight_idx_ > label_idx_) {
weight_idx_ -= 1;
}
ignore_features_.emplace(weight_idx_);
}
if (io_config.group_column.size() > 0) {
if (Common::StartsWith(io_config.group_column, name_prefix)) {
std::string name = io_config.group_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
group_idx_ = name2idx[name];
Log::Info("Using column %s as group/query id", name.c_str());
} else {
Log::Fatal("Could not find group/query column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config.group_column.c_str(), &group_idx_)) {
Log::Fatal("group_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as group/query id", group_idx_);
}
// skip for label column
if (group_idx_ > label_idx_) {
group_idx_ -= 1;
}
ignore_features_.emplace(group_idx_);
}
// create text parser
parser_ = Parser::CreateParser(data_filename_, io_config.has_header, 0, label_idx_);
if (parser_ == nullptr) {
Log::Fatal("Could not recognize data format of %s", data_filename_);
}
} else {
// only need to load initialize score, other meta data will be loaded from binary file
metadata_.Init(init_score_filename, num_class_);
Log::Info("Loading data set from binary file");
parser_ = nullptr;
text_reader_ = nullptr;
}
}
Dataset::Dataset(const IOConfig& io_config, const PredictFunction& predict_fun)
:data_filename_(""), random_(io_config.data_random_seed),
max_bin_(io_config.max_bin), is_enable_sparse_(io_config.is_enable_sparse),
predict_fun_(predict_fun), bin_construct_sample_cnt_(io_config.bin_construct_sample_cnt) {
num_class_ = io_config.num_class;
parser_ = nullptr;
text_reader_ = nullptr;
}
Dataset::~Dataset() {
if (parser_ != nullptr) { delete parser_; }
if (text_reader_ != nullptr) { delete text_reader_; }
for (auto& feature : features_) {
delete feature;
}
features_.clear();
}
void Dataset::LoadDataToMemory(int rank, int num_machines, bool is_pre_partition) {
used_data_indices_.clear();
if (num_machines == 1 || is_pre_partition) {
// read all lines
num_data_ = text_reader_->ReadAllLines();
global_num_data_ = num_data_;
} else { // need partition data
// get query data
const data_size_t* query_boundaries = metadata_.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query data, minimal sample unit is one record
global_num_data_ = text_reader_->ReadAndFilterLines([this, rank, num_machines](data_size_t) {
if (random_.NextInt(0, num_machines) == rank) {
return true;
} else {
return false;
void Dataset::CopyFeatureMetadataTo(Dataset *dataset, bool is_enable_sparse) const {
dataset->features_.clear();
// copy feature bin mapper data
for (Feature* feature : features_) {
dataset->features_.push_back(new Feature(feature->feature_index(),
new BinMapper(*feature->bin_mapper()), dataset->num_data_, is_enable_sparse));
}
}, &used_data_indices_);
} else {
// if contain query data, minimal sample unit is one query
data_size_t num_queries = metadata_.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
global_num_data_ = text_reader_->ReadAndFilterLines(
[this, rank, num_machines, &qid, &query_boundaries, &is_query_used, num_queries]
(data_size_t line_idx) {
if (qid >= num_queries) {
Log::Fatal("Current query exceeds the range of the query file, please ensure the query file is correct");
}
if (line_idx >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
return is_query_used;
}, &used_data_indices_);
}
// set number of data
num_data_ = static_cast<data_size_t>(used_data_indices_.size());
}
}
void Dataset::SampleDataFromMemory(std::vector<std::string>* out_data) {
const size_t sample_cnt = static_cast<size_t>(num_data_ < bin_construct_sample_cnt_ ? num_data_ : bin_construct_sample_cnt_);
std::vector<size_t> sample_indices = random_.Sample(num_data_, sample_cnt);
out_data->clear();
for (size_t i = 0; i < sample_indices.size(); ++i) {
const size_t idx = sample_indices[i];
out_data->push_back(text_reader_->Lines()[idx]);
}
}
void Dataset::SampleDataFromFile(int rank, int num_machines, bool is_pre_partition,
std::vector<std::string>* out_data) {
used_data_indices_.clear();
const data_size_t sample_cnt = static_cast<data_size_t>(bin_construct_sample_cnt_);
if (num_machines == 1 || is_pre_partition) {
num_data_ = static_cast<data_size_t>(text_reader_->SampleFromFile(random_, sample_cnt, out_data));
global_num_data_ = num_data_;
} else { // need partition data
// get query data
const data_size_t* query_boundaries = metadata_.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query file, minimal sample unit is one record
global_num_data_ = text_reader_->SampleAndFilterFromFile([this, rank, num_machines]
(data_size_t) {
if (random_.NextInt(0, num_machines) == rank) {
return true;
} else {
return false;
}
}, &used_data_indices_, random_, sample_cnt, out_data);
} else {
// if contain query file, minimal sample unit is one query
data_size_t num_queries = metadata_.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
global_num_data_ = text_reader_->SampleAndFilterFromFile(
[this, rank, num_machines, &qid, &query_boundaries, &is_query_used, num_queries]
(data_size_t line_idx) {
if (qid >= num_queries) {
Log::Fatal("Query id exceeds the range of the query file, \
please ensure the query file is correct");
}
if (line_idx >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
return is_query_used;
}, &used_data_indices_, random_, sample_cnt, out_data);
}
num_data_ = static_cast<data_size_t>(used_data_indices_.size());
}
}
void Dataset::InitByBinMapper(std::vector<const BinMapper*> bin_mappers, data_size_t num_data) {
num_data_ = num_data;
global_num_data_ = num_data_;
// initialize label
metadata_.Init(num_data_, num_class_, -1, -1);
// free old memory
for (auto& feature : features_) {
delete feature;
}
features_.clear();
used_feature_map_ = std::vector<int>(bin_mappers.size(), -1);
for (size_t i = 0; i < bin_mappers.size(); ++i) {
if (bin_mappers[i] != nullptr) {
features_.push_back(new Feature(static_cast<int>(i), new BinMapper(bin_mappers[i]), num_data_, is_enable_sparse_));
used_feature_map_[i] = static_cast<int>(features_.size());
}
}
num_features_ = static_cast<int>(features_.size());
dataset->used_feature_map_ = used_feature_map_;
dataset->num_features_ = static_cast<int>(dataset->features_.size());
dataset->num_total_features_ = num_total_features_;
dataset->feature_names_ = feature_names_;
}
std::vector<const BinMapper*> Dataset::GetBinMappers() const {
@ -314,27 +51,6 @@ std::vector<const BinMapper*> Dataset::GetBinMappers() const {
return ret;
}
void Dataset::PushData(const std::vector<std::vector<std::pair<int, float>>>& datas, data_size_t start_idx, bool is_finished) {
// if doesn't need to prediction with initial model
#pragma omp parallel for schedule(guided)
for (data_size_t i = 0; i < static_cast<int>(datas.size()); ++i) {
const int tid = omp_get_thread_num();
for (auto& inner_data : datas[i]) {
int feature_idx = used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
features_[feature_idx]->PushData(tid, start_idx + i, inner_data.second);
}
}
}
if (is_finished) {
#pragma omp parallel for schedule(guided)
for (int i = 0; i < num_features_; ++i) {
features_[i]->FinishLoad();
}
}
}
void Dataset::SetField(const char* field_name, const void* field_data, data_size_t num_element, int type) {
std::string name(field_name);
name = Common::Trim(name);
@ -363,394 +79,6 @@ void Dataset::SetField(const char* field_name, const void* field_data, data_size
}
}
void Dataset::ConstructBinMappers(int rank, int num_machines, const std::vector<std::string>& sample_data) {
// sample_values[i][j], means the value of j-th sample on i-th feature
std::vector<std::vector<double>> sample_values;
// temp buffer for one line features and label
std::vector<std::pair<int, double>> oneline_features;
double label;
for (size_t i = 0; i < sample_data.size(); ++i) {
oneline_features.clear();
// parse features
parser_->ParseOneLine(sample_data[i].c_str(), &oneline_features, &label);
// push 0 first, then edit the value according existing feature values
for (auto& feature_values : sample_values) {
feature_values.push_back(0.0);
}
for (std::pair<int, double>& inner_data : oneline_features) {
if (static_cast<size_t>(inner_data.first) >= sample_values.size()) {
// if need expand feature set
size_t need_size = inner_data.first - sample_values.size() + 1;
for (size_t j = 0; j < need_size; ++j) {
// push i+1 0
sample_values.emplace_back(i + 1, 0.0f);
}
}
// edit the feature value
sample_values[inner_data.first][i] = inner_data.second;
}
}
features_.clear();
// -1 means doesn't use this feature
used_feature_map_ = std::vector<int>(sample_values.size(), -1);
num_total_features_ = static_cast<int>(sample_values.size());
// check the range of label_idx, weight_idx and group_idx
CHECK(label_idx_ >= 0 && label_idx_ <= num_total_features_);
CHECK(weight_idx_ < 0 || weight_idx_ < num_total_features_);
CHECK(group_idx_ < 0 || group_idx_ < num_total_features_);
// fill feature_names_ if not header
if (feature_names_.size() <= 0) {
for (int i = 0; i < num_total_features_; ++i) {
std::stringstream str_buf;
str_buf << "Column_" << i;
feature_names_.push_back(str_buf.str());
}
}
// start find bins
if (num_machines == 1) {
std::vector<BinMapper*> bin_mappers(sample_values.size());
// if only one machine, find bin locally
#pragma omp parallel for schedule(guided)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
bin_mappers[i] = new BinMapper();
bin_mappers[i]->FindBin(&sample_values[i], max_bin_);
}
for (size_t i = 0; i < sample_values.size(); ++i) {
if (bin_mappers[i] == nullptr) {
Log::Warning("Ignoring feature %s", feature_names_[i].c_str());
}
else if (!bin_mappers[i]->is_trival()) {
// map real feature index to used feature index
used_feature_map_[i] = static_cast<int>(features_.size());
// push new feature
features_.push_back(new Feature(static_cast<int>(i), bin_mappers[i],
num_data_, is_enable_sparse_));
} else {
// if feature is trival(only 1 bin), free spaces
Log::Warning("Ignoring feature %s, only has one value", feature_names_[i].c_str());
delete bin_mappers[i];
}
}
} else {
// if have multi-machines, need find bin distributed
// different machines will find bin for different features
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ strat[i], start[i] + len[i] )
int* start = new int[num_machines];
int* len = new int[num_machines];
int total_num_feature = static_cast<int>(sample_values.size());
int step = (total_num_feature + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }
start[0] = 0;
for (int i = 0; i < num_machines - 1; ++i) {
len[i] = Common::Min<int>(step, total_num_feature - start[i]);
start[i + 1] = start[i] + len[i];
}
len[num_machines - 1] = total_num_feature - start[num_machines - 1];
// get size of bin mapper with max_bin_ size
int type_size = BinMapper::SizeForSpecificBin(max_bin_);
// since sizes of different feature may not be same, we expand all bin mapper to type_size
int buffer_size = type_size * total_num_feature;
char* input_buffer = new char[buffer_size];
char* output_buffer = new char[buffer_size];
// find local feature bins and copy to buffer
#pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
BinMapper* bin_mapper = new BinMapper();
bin_mapper->FindBin(&sample_values[start[rank] + i], max_bin_);
bin_mapper->CopyTo(input_buffer + i * type_size);
// don't need this any more
delete bin_mapper;
}
// convert to binary size
for (int i = 0; i < num_machines; ++i) {
start[i] *= type_size;
len[i] *= type_size;
}
// gather global feature bin mappers
Network::Allgather(input_buffer, buffer_size, start, len, output_buffer);
// restore features bins from buffer
for (int i = 0; i < total_num_feature; ++i) {
if (ignore_features_.count(i) > 0) {
Log::Warning("Ignoring feature %s", feature_names_[i].c_str());
continue;
}
BinMapper* bin_mapper = new BinMapper();
bin_mapper->CopyFrom(output_buffer + i * type_size);
if (!bin_mapper->is_trival()) {
used_feature_map_[i] = static_cast<int>(features_.size());
features_.push_back(new Feature(static_cast<int>(i), bin_mapper, num_data_, is_enable_sparse_));
} else {
Log::Warning("Ignoring feature %s, only has one value", feature_names_[i].c_str());
delete bin_mapper;
}
}
// free buffer
delete[] start;
delete[] len;
delete[] input_buffer;
delete[] output_buffer;
}
num_features_ = static_cast<int>(features_.size());
}
void Dataset::LoadTrainData(int rank, int num_machines, bool is_pre_partition, bool use_two_round_loading) {
// don't support query id in data file when training in parallel
if (num_machines > 1 && !is_pre_partition) {
if (group_idx_ > 0) {
Log::Fatal("Using a query id without pre-partitioning the data file is not supported for parallel training. \
Please use an additional query file or pre-partition the data");
}
}
used_data_indices_.clear();
if (!is_loading_from_binfile_ ) {
if (!use_two_round_loading) {
// read data to memory
LoadDataToMemory(rank, num_machines, is_pre_partition);
std::vector<std::string> sample_data;
// sample data
SampleDataFromMemory(&sample_data);
// construct feature bin mappers
ConstructBinMappers(rank, num_machines, sample_data);
// initialize label
metadata_.Init(num_data_, num_class_, weight_idx_, group_idx_);
// extract features
ExtractFeaturesFromMemory();
} else {
std::vector<std::string> sample_data;
// sample data from file
SampleDataFromFile(rank, num_machines, is_pre_partition, &sample_data);
// construct feature bin mappers
ConstructBinMappers(rank, num_machines, sample_data);
// initialize label
metadata_.Init(num_data_, num_class_, weight_idx_, group_idx_);
// extract features
ExtractFeaturesFromFile();
}
} else {
std::string bin_filename(data_filename_);
bin_filename.append(".bin");
// load data from binary file
LoadDataFromBinFile(bin_filename.c_str(), rank, num_machines, is_pre_partition);
}
// check meta data
metadata_.CheckOrPartition(static_cast<data_size_t>(global_num_data_), used_data_indices_);
// free memory
used_data_indices_.clear();
used_data_indices_.shrink_to_fit();
// need to check training data
CheckDataset();
}
void Dataset::LoadValidationData(const Dataset* train_set, bool use_two_round_loading) {
used_data_indices_.clear();
if (!is_loading_from_binfile_ ) {
if (!use_two_round_loading) {
// read data in memory
LoadDataToMemory(0, 1, false);
// initialize label
metadata_.Init(num_data_, num_class_, weight_idx_, group_idx_);
features_.clear();
// copy feature bin mapper data
for (Feature* feature : train_set->features_) {
features_.push_back(new Feature(feature->feature_index(), new BinMapper(*feature->bin_mapper()), num_data_, is_enable_sparse_));
}
used_feature_map_ = train_set->used_feature_map_;
num_features_ = static_cast<int>(features_.size());
num_total_features_ = train_set->num_total_features_;
feature_names_ = train_set->feature_names_;
// extract features
ExtractFeaturesFromMemory();
} else {
// Get number of lines of data file
num_data_ = static_cast<data_size_t>(text_reader_->CountLine());
// initialize label
metadata_.Init(num_data_, num_class_, weight_idx_, group_idx_);
features_.clear();
// copy feature bin mapper data
for (Feature* feature : train_set->features_) {
features_.push_back(new Feature(feature->feature_index(), new BinMapper(*feature->bin_mapper()), num_data_, is_enable_sparse_));
}
used_feature_map_ = train_set->used_feature_map_;
num_features_ = static_cast<int>(features_.size());
num_total_features_ = train_set->num_total_features_;
feature_names_ = train_set->feature_names_;
// extract features
ExtractFeaturesFromFile();
}
} else {
std::string bin_filename(data_filename_);
bin_filename.append(".bin");
// load from binary file
LoadDataFromBinFile(bin_filename.c_str(), 0, 1, false);
}
// not need to check validation data
// check meta data
metadata_.CheckOrPartition(static_cast<data_size_t>(global_num_data_), used_data_indices_);
// CheckDataset();
}
void Dataset::ExtractFeaturesFromMemory() {
std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f;
if (predict_fun_ == nullptr) {
// if doesn't need to prediction with initial model
#pragma omp parallel for schedule(guided) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < num_data_; ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser_->ParseOneLine(text_reader_->Lines()[i].c_str(), &oneline_features, &tmp_label);
// set label
metadata_.SetLabelAt(i, static_cast<float>(tmp_label));
// free processed line:
text_reader_->Lines()[i].clear();
// shrink_to_fit will be very slow in linux, and seems not free memory, disable for now
// text_reader_->Lines()[i].shrink_to_fit();
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
features_[feature_idx]->PushData(tid, i, inner_data.second);
}
else {
if (inner_data.first == weight_idx_) {
metadata_.SetWeightAt(i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
metadata_.SetQueryAt(i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
} else {
// if need to prediction with initial model
float* init_score = new float[num_data_ * num_class_];
#pragma omp parallel for schedule(guided) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < num_data_; ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser_->ParseOneLine(text_reader_->Lines()[i].c_str(), &oneline_features, &tmp_label);
// set initial score
std::vector<double> oneline_init_score = predict_fun_(oneline_features);
for (int k = 0; k < num_class_; ++k){
init_score[k * num_data_ + i] = static_cast<float>(oneline_init_score[k]);
}
// set label
metadata_.SetLabelAt(i, static_cast<float>(tmp_label));
// free processed line:
text_reader_->Lines()[i].clear();
// shrink_to_fit will be very slow in linux, and seems not free memory, disable for now
// text_reader_->Lines()[i].shrink_to_fit();
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
features_[feature_idx]->PushData(tid, i, inner_data.second);
}
else {
if (inner_data.first == weight_idx_) {
metadata_.SetWeightAt(i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
metadata_.SetQueryAt(i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
// metadata_ will manage space of init_score
metadata_.SetInitScore(init_score, num_data_ * num_class_);
delete[] init_score;
}
#pragma omp parallel for schedule(guided)
for (int i = 0; i < num_features_; ++i) {
features_[i]->FinishLoad();
}
// text data can be free after loaded feature values
text_reader_->Clear();
}
void Dataset::ExtractFeaturesFromFile() {
float* init_score = nullptr;
if (predict_fun_ != nullptr) {
init_score = new float[num_data_ * num_class_];
}
std::function<void(data_size_t, const std::vector<std::string>&)> process_fun =
[this, &init_score]
(data_size_t start_idx, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f;
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser_->ParseOneLine(lines[i].c_str(), &oneline_features, &tmp_label);
// set initial score
if (init_score != nullptr) {
std::vector<double> oneline_init_score = predict_fun_(oneline_features);
for (int k = 0; k < num_class_; ++k){
init_score[k * num_data_ + start_idx + i] = static_cast<float>(oneline_init_score[k]);
}
}
// set label
metadata_.SetLabelAt(start_idx + i, static_cast<float>(tmp_label));
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
features_[feature_idx]->PushData(tid, start_idx + i, inner_data.second);
}
else {
if (inner_data.first == weight_idx_) {
metadata_.SetWeightAt(start_idx + i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
metadata_.SetQueryAt(start_idx + i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
};
if (used_data_indices_.size() > 0) {
// only need part of data
text_reader_->ReadPartAndProcessParallel(used_data_indices_, process_fun);
} else {
// need full data
text_reader_->ReadAllAndProcessParallel(process_fun);
}
// metadata_ will manage space of init_score
if (init_score != nullptr) {
metadata_.SetInitScore(init_score, num_data_ * num_class_);
delete[] init_score;
}
#pragma omp parallel for schedule(guided)
for (int i = 0; i < num_features_; ++i) {
features_[i]->FinishLoad();
}
}
void Dataset::SaveBinaryFile(const char* bin_filename) {
if (!is_loading_from_binfile_) {
@ -773,17 +101,14 @@ void Dataset::SaveBinaryFile(const char* bin_filename) {
Log::Info("Saving data to binary file %s", data_filename_);
// get size of header
size_t size_of_header = sizeof(global_num_data_) + sizeof(is_enable_sparse_)
+ sizeof(max_bin_) + sizeof(num_data_) + sizeof(num_features_) + sizeof(num_total_features_) +sizeof(size_t) + sizeof(int) * used_feature_map_.size();
size_t size_of_header = sizeof(num_data_) + sizeof(num_features_) + sizeof(num_total_features_)
+ sizeof(size_t) + sizeof(int) * used_feature_map_.size();
// size of feature names
for (int i = 0; i < num_total_features_; ++i) {
size_of_header += feature_names_[i].size() + sizeof(int);
}
fwrite(&size_of_header, sizeof(size_of_header), 1, file);
// write header
fwrite(&global_num_data_, sizeof(global_num_data_), 1, file);
fwrite(&is_enable_sparse_, sizeof(is_enable_sparse_), 1, file);
fwrite(&max_bin_, sizeof(max_bin_), 1, file);
fwrite(&num_data_, sizeof(num_data_), 1, file);
fwrite(&num_features_, sizeof(num_features_), 1, file);
fwrite(&num_total_features_, sizeof(num_features_), 1, file);
@ -817,196 +142,4 @@ void Dataset::SaveBinaryFile(const char* bin_filename) {
}
}
void Dataset::CheckCanLoadFromBin() {
std::string bin_filename(data_filename_);
bin_filename.append(".bin");
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, bin_filename.c_str(), "rb");
#else
file = fopen(bin_filename.c_str(), "rb");
#endif
if (file == NULL) {
is_loading_from_binfile_ = false;
} else {
is_loading_from_binfile_ = true;
fclose(file);
}
}
void Dataset::LoadDataFromBinFile(const char* bin_filename, int rank, int num_machines, bool is_pre_partition) {
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, bin_filename, "rb");
#else
file = fopen(bin_filename, "rb");
#endif
if (file == NULL) {
Log::Fatal("Cannot read binary data from %s", bin_filename);
}
// buffer to read binary file
size_t buffer_size = 16 * 1024 * 1024;
char* buffer = new char[buffer_size];
// read size of header
size_t read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: header has the wrong size");
}
size_t size_of_head = *(reinterpret_cast<size_t*>(buffer));
// re-allocmate space if not enough
if (size_of_head > buffer_size) {
delete[] buffer;
buffer_size = size_of_head;
buffer = new char[buffer_size];
}
// read header
read_cnt = fread(buffer, 1, size_of_head, file);
if (read_cnt != size_of_head) {
Log::Fatal("Binary file error: header is incorrect");
}
// get header
const char* mem_ptr = buffer;
global_num_data_ = *(reinterpret_cast<const size_t*>(mem_ptr));
mem_ptr += sizeof(global_num_data_);
is_enable_sparse_ = *(reinterpret_cast<const bool*>(mem_ptr));
mem_ptr += sizeof(is_enable_sparse_);
max_bin_ = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(max_bin_);
num_data_ = *(reinterpret_cast<const data_size_t*>(mem_ptr));
mem_ptr += sizeof(num_data_);
num_features_ = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(num_features_);
num_total_features_ = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(num_total_features_);
size_t num_used_feature_map = *(reinterpret_cast<const size_t*>(mem_ptr));
mem_ptr += sizeof(num_used_feature_map);
const int* tmp_feature_map = reinterpret_cast<const int*>(mem_ptr);
used_feature_map_.clear();
for (size_t i = 0; i < num_used_feature_map; ++i) {
used_feature_map_.push_back(tmp_feature_map[i]);
}
mem_ptr += sizeof(int) * num_used_feature_map;
// get feature names
feature_names_.clear();
// write feature names
for (int i = 0; i < num_total_features_; ++i) {
int str_len = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(int);
std::stringstream str_buf;
for (int j = 0; j < str_len; ++j) {
char tmp_char = *(reinterpret_cast<const char*>(mem_ptr));
mem_ptr += sizeof(char);
str_buf << tmp_char;
}
feature_names_.emplace_back(str_buf.str());
}
// read size of meta data
read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: meta data has the wrong size");
}
size_t size_of_metadata = *(reinterpret_cast<size_t*>(buffer));
// re-allocate space if not enough
if (size_of_metadata > buffer_size) {
delete[] buffer;
buffer_size = size_of_metadata;
buffer = new char[buffer_size];
}
// read meta data
read_cnt = fread(buffer, 1, size_of_metadata, file);
if (read_cnt != size_of_metadata) {
Log::Fatal("Binary file error: meta data is incorrect");
}
// load meta data
metadata_.LoadFromMemory(buffer);
used_data_indices_.clear();
global_num_data_ = num_data_;
// sample local used data if need to partition
if (num_machines > 1 && !is_pre_partition) {
const data_size_t* query_boundaries = metadata_.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query file, minimal sample unit is one record
for (data_size_t i = 0; i < num_data_; ++i) {
if (random_.NextInt(0, num_machines) == rank) {
used_data_indices_.push_back(i);
}
}
} else {
// if contain query file, minimal sample unit is one query
data_size_t num_queries = metadata_.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
for (data_size_t i = 0; i < num_data_; ++i) {
if (qid >= num_queries) {
Log::Fatal("Current query exceeds the range of the query file, please ensure the query file is correct");
}
if (i >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
if (is_query_used) {
used_data_indices_.push_back(i);
}
}
}
num_data_ = static_cast<data_size_t>(used_data_indices_.size());
}
metadata_.PartitionLabel(used_data_indices_);
// read feature data
for (int i = 0; i < num_features_; ++i) {
// read feature size
read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: feature %d has the wrong size", i);
}
size_t size_of_feature = *(reinterpret_cast<size_t*>(buffer));
// re-allocate space if not enough
if (size_of_feature > buffer_size) {
delete[] buffer;
buffer_size = size_of_feature;
buffer = new char[buffer_size];
}
read_cnt = fread(buffer, 1, size_of_feature, file);
if (read_cnt != size_of_feature) {
Log::Fatal("Binary file error: feature %d is incorrect, read count: %d", i, read_cnt);
}
features_.push_back(new Feature(buffer, static_cast<data_size_t>(global_num_data_), used_data_indices_));
}
delete[] buffer;
fclose(file);
}
void Dataset::CheckDataset() {
if (num_data_ <= 0) {
Log::Fatal("Data file %s is empty", data_filename_);
}
if (features_.size() <= 0) {
Log::Fatal("No usable features in data file %s", data_filename_);
}
}
} // namespace LightGBM

833
src/io/dataset_loader.cpp Normal file
Просмотреть файл

@ -0,0 +1,833 @@
#include <omp.h>
#include <LightGBM/utils/log.h>
#include <LightGBM/dataset_loader.h>
#include <LightGBM/feature.h>
#include <LightGBM/network.h>
namespace LightGBM {
DatasetLoader::DatasetLoader(const IOConfig& io_config, const PredictFunction& predict_fun)
:io_config_(io_config), predict_fun_(predict_fun){
}
DatasetLoader::~DatasetLoader() {
}
void DatasetLoader::SetHeadder(const char* filename) {
TextReader<data_size_t> text_reader(filename, io_config_.has_header);
std::unordered_map<std::string, int> name2idx;
// get column names
if (io_config_.has_header) {
std::string first_line = text_reader.first_line();
feature_names_ = Common::Split(first_line.c_str(), "\t ,");
for (size_t i = 0; i < feature_names_.size(); ++i) {
name2idx[feature_names_[i]] = static_cast<int>(i);
}
}
std::string name_prefix("name:");
// load label idx
if (io_config_.label_column.size() > 0) {
if (Common::StartsWith(io_config_.label_column, name_prefix)) {
std::string name = io_config_.label_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
label_idx_ = name2idx[name];
Log::Info("Using column %s as label", name.c_str());
} else {
Log::Fatal("Could not find label column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config_.label_column.c_str(), &label_idx_)) {
Log::Fatal("label_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as label", label_idx_);
}
}
if (feature_names_.size() > 0) {
// erase label column name
feature_names_.erase(feature_names_.begin() + label_idx_);
}
// load ignore columns
if (io_config_.ignore_column.size() > 0) {
if (Common::StartsWith(io_config_.ignore_column, name_prefix)) {
std::string names = io_config_.ignore_column.substr(name_prefix.size());
for (auto name : Common::Split(names.c_str(), ',')) {
if (name2idx.count(name) > 0) {
int tmp = name2idx[name];
// skip for label column
if (tmp > label_idx_) { tmp -= 1; }
ignore_features_.emplace(tmp);
} else {
Log::Fatal("Could not find ignore column %s in data file", name.c_str());
}
}
} else {
for (auto token : Common::Split(io_config_.ignore_column.c_str(), ',')) {
int tmp = 0;
if (!Common::AtoiAndCheck(token.c_str(), &tmp)) {
Log::Fatal("ignore_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
// skip for label column
if (tmp > label_idx_) { tmp -= 1; }
ignore_features_.emplace(tmp);
}
}
}
// load weight idx
if (io_config_.weight_column.size() > 0) {
if (Common::StartsWith(io_config_.weight_column, name_prefix)) {
std::string name = io_config_.weight_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
weight_idx_ = name2idx[name];
Log::Info("Using column %s as weight", name.c_str());
} else {
Log::Fatal("Could not find weight column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config_.weight_column.c_str(), &weight_idx_)) {
Log::Fatal("weight_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as weight", weight_idx_);
}
// skip for label column
if (weight_idx_ > label_idx_) {
weight_idx_ -= 1;
}
ignore_features_.emplace(weight_idx_);
}
if (io_config_.group_column.size() > 0) {
if (Common::StartsWith(io_config_.group_column, name_prefix)) {
std::string name = io_config_.group_column.substr(name_prefix.size());
if (name2idx.count(name) > 0) {
group_idx_ = name2idx[name];
Log::Info("Using column %s as group/query id", name.c_str());
} else {
Log::Fatal("Could not find group/query column %s in data file", name.c_str());
}
} else {
if (!Common::AtoiAndCheck(io_config_.group_column.c_str(), &group_idx_)) {
Log::Fatal("group_column is not a number, \
if you want to use a column name, \
please add the prefix \"name:\" to the column name");
}
Log::Info("Using column number %d as group/query id", group_idx_);
}
// skip for label column
if (group_idx_ > label_idx_) {
group_idx_ -= 1;
}
ignore_features_.emplace(group_idx_);
}
}
Dataset* DatasetLoader::LoadFromFile(const char* filename, int rank, int num_machines) {
// don't support query id in data file when training in parallel
if (num_machines > 1 && !io_config_.is_pre_partition) {
if (group_idx_ > 0) {
Log::Fatal("Using a query id without pre-partitioning the data file is not supported for parallel training. \
Please use an additional query file or pre-partition the data");
}
}
auto parser = Parser::CreateParser(filename, io_config_.has_header, 0, label_idx_);
if (parser == nullptr) {
Log::Fatal("Could not recognize data format of %s", filename);
}
data_size_t num_global_data = 0;
std::vector<data_size_t> used_data_indices;
Dataset* dataset = new Dataset();
dataset->data_filename_ = filename;
dataset->num_class_ = io_config_.num_class;
dataset->metadata_.Init(filename, dataset->num_class_);
bool is_loading_from_binfile = CheckCanLoadFromBin(filename);
if (!is_loading_from_binfile) {
if (!io_config_.use_two_round_loading) {
// read data to memory
auto text_data = LoadTextDataToMemory(filename, dataset->metadata_, rank, num_machines,&num_global_data, &used_data_indices);
dataset->num_data_ = static_cast<data_size_t>(text_data.size());
// sample data
auto sample_data = SampleTextDataFromMemory(text_data);
// construct feature bin mappers
ConstructBinMappersFromTextData(rank, num_machines, sample_data, parser, dataset);
// initialize label
dataset->metadata_.Init(dataset->num_data_, io_config_.num_class, weight_idx_, group_idx_);
// extract features
ExtractFeaturesFromMemory(text_data, parser, dataset);
text_data.clear();
} else {
// sample data from file
auto sample_data = SampleTextDataFromFile(filename, dataset->metadata_, rank, num_machines, &num_global_data, &used_data_indices);
if (used_data_indices.size() > 0) {
dataset->num_data_ = static_cast<data_size_t>(used_data_indices.size());
} else {
dataset->num_data_ = num_global_data;
}
// construct feature bin mappers
ConstructBinMappersFromTextData(rank, num_machines, sample_data, parser, dataset);
// initialize label
dataset->metadata_.Init(dataset->num_data_, dataset->num_class_, weight_idx_, group_idx_);
// extract features
ExtractFeaturesFromFile(filename, parser, used_data_indices, dataset);
}
} else {
// load data from binary file
delete dataset;
std::string bin_filename(filename);
bin_filename.append(".bin");
dataset = LoadFromBinFile(bin_filename.c_str(), rank, num_machines);
}
// check meta data
dataset->metadata_.CheckOrPartition(num_global_data, used_data_indices);
// need to check training data
CheckDataset(dataset);
delete parser;
return dataset;
}
Dataset* DatasetLoader::LoadFromFileLikeOthers(const char* filename, const Dataset* other) {
auto parser = Parser::CreateParser(filename, io_config_.has_header, 0, label_idx_);
if (parser == nullptr) {
Log::Fatal("Could not recognize data format of %s", filename);
}
data_size_t num_global_data = 0;
std::vector<data_size_t> used_data_indices;
Dataset* dataset = new Dataset();
dataset->data_filename_ = filename;
dataset->num_class_ = io_config_.num_class;
dataset->metadata_.Init(filename, dataset->num_class_);
bool is_loading_from_binfile = CheckCanLoadFromBin(filename);
if (!is_loading_from_binfile) {
if (!io_config_.use_two_round_loading) {
// read data in memory
auto text_data = LoadTextDataToMemory(filename, dataset->metadata_, 0, 1, &num_global_data, &used_data_indices);
dataset->num_data_ = static_cast<data_size_t>(text_data.size());
// initialize label
dataset->metadata_.Init(dataset->num_data_, dataset->num_class_, weight_idx_, group_idx_);
other->CopyFeatureMetadataTo(dataset, io_config_.is_enable_sparse);
// extract features
ExtractFeaturesFromMemory(text_data, parser, dataset);
text_data.clear();
} else {
TextReader<data_size_t> text_reader(filename, io_config_.has_header);
// Get number of lines of data file
dataset->num_data_ = static_cast<data_size_t>(text_reader.CountLine());
num_global_data = dataset->num_data_;
// initialize label
dataset->metadata_.Init(dataset->num_data_, dataset->num_class_, weight_idx_, group_idx_);
other->CopyFeatureMetadataTo(dataset, io_config_.is_enable_sparse);
// extract features
ExtractFeaturesFromFile(filename, parser, used_data_indices, dataset);
}
} else {
// load data from binary file
delete dataset;
std::string bin_filename(filename);
bin_filename.append(".bin");
dataset = LoadFromBinFile(bin_filename.c_str(), 0, 1);
}
// not need to check validation data
// check meta data
dataset->metadata_.CheckOrPartition(num_global_data, used_data_indices);
delete parser;
return dataset;
}
Dataset* DatasetLoader::LoadFromBinFile(const char* bin_filename, int rank, int num_machines) {
Dataset* dataset = new Dataset();
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, bin_filename, "rb");
#else
file = fopen(bin_filename, "rb");
#endif
if (file == NULL) {
Log::Fatal("Could not read binary data from %s", bin_filename);
}
// buffer to read binary file
size_t buffer_size = 16 * 1024 * 1024;
char* buffer = new char[buffer_size];
// read size of header
size_t read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: header has the wrong size");
}
size_t size_of_head = *(reinterpret_cast<size_t*>(buffer));
// re-allocmate space if not enough
if (size_of_head > buffer_size) {
delete[] buffer;
buffer_size = size_of_head;
buffer = new char[buffer_size];
}
// read header
read_cnt = fread(buffer, 1, size_of_head, file);
if (read_cnt != size_of_head) {
Log::Fatal("Binary file error: header is incorrect");
}
// get header
const char* mem_ptr = buffer;
dataset->num_data_ = *(reinterpret_cast<const data_size_t*>(mem_ptr));
mem_ptr += sizeof(dataset->num_data_);
dataset->num_features_ = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(dataset->num_features_);
dataset->num_total_features_ = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(dataset->num_total_features_);
size_t num_used_feature_map = *(reinterpret_cast<const size_t*>(mem_ptr));
mem_ptr += sizeof(num_used_feature_map);
const int* tmp_feature_map = reinterpret_cast<const int*>(mem_ptr);
dataset->used_feature_map_.clear();
for (size_t i = 0; i < num_used_feature_map; ++i) {
dataset->used_feature_map_.push_back(tmp_feature_map[i]);
}
mem_ptr += sizeof(int) * num_used_feature_map;
// get feature names
feature_names_.clear();
// write feature names
for (int i = 0; i < dataset->num_total_features_; ++i) {
int str_len = *(reinterpret_cast<const int*>(mem_ptr));
mem_ptr += sizeof(int);
std::stringstream str_buf;
for (int j = 0; j < str_len; ++j) {
char tmp_char = *(reinterpret_cast<const char*>(mem_ptr));
mem_ptr += sizeof(char);
str_buf << tmp_char;
}
feature_names_.emplace_back(str_buf.str());
}
// read size of meta data
read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: meta data has the wrong size");
}
size_t size_of_metadata = *(reinterpret_cast<size_t*>(buffer));
// re-allocate space if not enough
if (size_of_metadata > buffer_size) {
delete[] buffer;
buffer_size = size_of_metadata;
buffer = new char[buffer_size];
}
// read meta data
read_cnt = fread(buffer, 1, size_of_metadata, file);
if (read_cnt != size_of_metadata) {
Log::Fatal("Binary file error: meta data is incorrect");
}
// load meta data
dataset->metadata_.LoadFromMemory(buffer);
std::vector<data_size_t> used_data_indices;
data_size_t num_global_data = dataset->num_data_;
// sample local used data if need to partition
if (num_machines > 1 && !io_config_.is_pre_partition) {
const data_size_t* query_boundaries = dataset->metadata_.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query file, minimal sample unit is one record
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
if (random_.NextInt(0, num_machines) == rank) {
used_data_indices.push_back(i);
}
}
} else {
// if contain query file, minimal sample unit is one query
data_size_t num_queries = dataset->metadata_.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
if (qid >= num_queries) {
Log::Fatal("Current query exceeds the range of the query file, please ensure the query file is correct");
}
if (i >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
if (is_query_used) {
used_data_indices.push_back(i);
}
}
}
dataset->num_data_ = static_cast<data_size_t>(used_data_indices.size());
}
dataset->metadata_.PartitionLabel(used_data_indices);
// read feature data
for (int i = 0; i < dataset->num_features_; ++i) {
// read feature size
read_cnt = fread(buffer, sizeof(size_t), 1, file);
if (read_cnt != 1) {
Log::Fatal("Binary file error: feature %d has the wrong size", i);
}
size_t size_of_feature = *(reinterpret_cast<size_t*>(buffer));
// re-allocate space if not enough
if (size_of_feature > buffer_size) {
delete[] buffer;
buffer_size = size_of_feature;
buffer = new char[buffer_size];
}
read_cnt = fread(buffer, 1, size_of_feature, file);
if (read_cnt != size_of_feature) {
Log::Fatal("Binary file error: feature %d is incorrect, read count: %d", i, read_cnt);
}
dataset->features_.push_back(new Feature(buffer, num_global_data, used_data_indices));
}
delete[] buffer;
fclose(file);
dataset->is_loading_from_binfile_ = true;
return dataset;
}
// ---- private functions ----
void DatasetLoader::CheckDataset(const Dataset* dataset) {
if (dataset->num_data_ <= 0) {
Log::Fatal("Data file %s is empty", dataset->data_filename_);
}
if (dataset->features_.size() <= 0) {
Log::Fatal("No usable features in data file %s", dataset->data_filename_);
}
}
std::vector<std::string> DatasetLoader::LoadTextDataToMemory(const char* filename, const Metadata& metadata,
int rank, int num_machines, int* num_global_data,
std::vector<data_size_t>* used_data_indices) {
TextReader<data_size_t> text_reader(filename, io_config_.has_header);
used_data_indices->clear();
if (num_machines == 1 || io_config_.is_pre_partition) {
// read all lines
*num_global_data = text_reader.ReadAllLines();
} else { // need partition data
// get query data
const data_size_t* query_boundaries = metadata.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query data, minimal sample unit is one record
*num_global_data = text_reader.ReadAndFilterLines([this, rank, num_machines](data_size_t) {
if (random_.NextInt(0, num_machines) == rank) {
return true;
} else {
return false;
}
}, used_data_indices);
} else {
// if contain query data, minimal sample unit is one query
data_size_t num_queries = metadata.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
*num_global_data = text_reader.ReadAndFilterLines(
[this, rank, num_machines, &qid, &query_boundaries, &is_query_used, num_queries]
(data_size_t line_idx) {
if (qid >= num_queries) {
Log::Fatal("Current query exceeds the range of the query file, please ensure the query file is correct");
}
if (line_idx >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
return is_query_used;
}, used_data_indices);
}
}
return std::move(text_reader.Lines());
}
std::vector<std::string> DatasetLoader::SampleTextDataFromMemory(const std::vector<std::string>& data) {
const size_t sample_cnt = static_cast<size_t>(data.size() < io_config_.bin_construct_sample_cnt ? data.size() : io_config_.bin_construct_sample_cnt);
std::vector<size_t> sample_indices = random_.Sample(data.size(), sample_cnt);
std::vector<std::string> out;
for (size_t i = 0; i < sample_indices.size(); ++i) {
const size_t idx = sample_indices[i];
out.push_back(data[idx]);
}
return out;
}
std::vector<std::string> DatasetLoader::SampleTextDataFromFile(const char* filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector<data_size_t>* used_data_indices) {
const data_size_t sample_cnt = static_cast<data_size_t>(io_config_.bin_construct_sample_cnt);
TextReader<data_size_t> text_reader(filename, io_config_.has_header);
std::vector<std::string> out_data;
if (num_machines == 1 || io_config_.is_pre_partition) {
*num_global_data = static_cast<data_size_t>(text_reader.SampleFromFile(random_, sample_cnt, &out_data));
} else { // need partition data
// get query data
const data_size_t* query_boundaries = metadata.query_boundaries();
if (query_boundaries == nullptr) {
// if not contain query file, minimal sample unit is one record
*num_global_data = text_reader.SampleAndFilterFromFile([this, rank, num_machines]
(data_size_t) {
if (random_.NextInt(0, num_machines) == rank) {
return true;
} else {
return false;
}
}, used_data_indices, random_, sample_cnt, &out_data);
} else {
// if contain query file, minimal sample unit is one query
data_size_t num_queries = metadata.num_queries();
data_size_t qid = -1;
bool is_query_used = false;
*num_global_data = text_reader.SampleAndFilterFromFile(
[this, rank, num_machines, &qid, &query_boundaries, &is_query_used, num_queries]
(data_size_t line_idx) {
if (qid >= num_queries) {
Log::Fatal("Query id exceeds the range of the query file, \
please ensure the query file is correct");
}
if (line_idx >= query_boundaries[qid + 1]) {
// if is new query
is_query_used = false;
if (random_.NextInt(0, num_machines) == rank) {
is_query_used = true;
}
++qid;
}
return is_query_used;
}, used_data_indices, random_, sample_cnt, &out_data);
}
}
return out_data;
}
void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, const std::vector<std::string>& sample_data, const Parser* parser, Dataset* dataset) {
// sample_values[i][j], means the value of j-th sample on i-th feature
std::vector<std::vector<double>> sample_values;
// temp buffer for one line features and label
std::vector<std::pair<int, double>> oneline_features;
double label;
for (size_t i = 0; i < sample_data.size(); ++i) {
oneline_features.clear();
// parse features
parser->ParseOneLine(sample_data[i].c_str(), &oneline_features, &label);
// push 0 first, then edit the value according existing feature values
for (auto& feature_values : sample_values) {
feature_values.push_back(0.0);
}
for (std::pair<int, double>& inner_data : oneline_features) {
if (static_cast<size_t>(inner_data.first) >= sample_values.size()) {
// if need expand feature set
size_t need_size = inner_data.first - sample_values.size() + 1;
for (size_t j = 0; j < need_size; ++j) {
// push i+1 0
sample_values.emplace_back(i + 1, 0.0f);
}
}
// edit the feature value
sample_values[inner_data.first][i] = inner_data.second;
}
}
dataset->features_.clear();
// -1 means doesn't use this feature
dataset->used_feature_map_ = std::vector<int>(sample_values.size(), -1);
dataset->num_total_features_ = static_cast<int>(sample_values.size());
// check the range of label_idx, weight_idx and group_idx
CHECK(label_idx_ >= 0 && label_idx_ <= dataset->num_total_features_);
CHECK(weight_idx_ < 0 || weight_idx_ < dataset->num_total_features_);
CHECK(group_idx_ < 0 || group_idx_ < dataset->num_total_features_);
// fill feature_names_ if not header
if (feature_names_.size() <= 0) {
for (int i = 0; i < dataset->num_total_features_; ++i) {
std::stringstream str_buf;
str_buf << "Column_" << i;
feature_names_.push_back(str_buf.str());
}
}
dataset->feature_names_ = feature_names_;
// start find bins
if (num_machines == 1) {
std::vector<BinMapper*> bin_mappers(sample_values.size());
// if only one machine, find bin locally
#pragma omp parallel for schedule(guided)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
bin_mappers[i] = new BinMapper();
bin_mappers[i]->FindBin(&sample_values[i], io_config_.max_bin);
}
for (size_t i = 0; i < sample_values.size(); ++i) {
if (bin_mappers[i] == nullptr) {
Log::Warning("Ignoring feature %s", feature_names_[i].c_str());
} else if (!bin_mappers[i]->is_trival()) {
// map real feature index to used feature index
dataset->used_feature_map_[i] = static_cast<int>(dataset->features_.size());
// push new feature
dataset->features_.push_back(new Feature(static_cast<int>(i), bin_mappers[i],
dataset->num_data_, io_config_.is_enable_sparse));
} else {
// if feature is trival(only 1 bin), free spaces
Log::Warning("Ignoring feature %s, only has one value", feature_names_[i].c_str());
delete bin_mappers[i];
}
}
} else {
// if have multi-machines, need find bin distributed
// different machines will find bin for different features
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ strat[i], start[i] + len[i] )
int* start = new int[num_machines];
int* len = new int[num_machines];
int total_num_feature = static_cast<int>(sample_values.size());
int step = (total_num_feature + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }
start[0] = 0;
for (int i = 0; i < num_machines - 1; ++i) {
len[i] = Common::Min<int>(step, total_num_feature - start[i]);
start[i + 1] = start[i] + len[i];
}
len[num_machines - 1] = total_num_feature - start[num_machines - 1];
// get size of bin mapper with max_bin_ size
int type_size = BinMapper::SizeForSpecificBin(io_config_.max_bin);
// since sizes of different feature may not be same, we expand all bin mapper to type_size
int buffer_size = type_size * total_num_feature;
char* input_buffer = new char[buffer_size];
char* output_buffer = new char[buffer_size];
// find local feature bins and copy to buffer
#pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
BinMapper* bin_mapper = new BinMapper();
bin_mapper->FindBin(&sample_values[start[rank] + i], io_config_.max_bin);
bin_mapper->CopyTo(input_buffer + i * type_size);
// don't need this any more
delete bin_mapper;
}
// convert to binary size
for (int i = 0; i < num_machines; ++i) {
start[i] *= type_size;
len[i] *= type_size;
}
// gather global feature bin mappers
Network::Allgather(input_buffer, buffer_size, start, len, output_buffer);
// restore features bins from buffer
for (int i = 0; i < total_num_feature; ++i) {
if (ignore_features_.count(i) > 0) {
Log::Warning("Ignoring feature %s", feature_names_[i].c_str());
continue;
}
BinMapper* bin_mapper = new BinMapper();
bin_mapper->CopyFrom(output_buffer + i * type_size);
if (!bin_mapper->is_trival()) {
dataset->used_feature_map_[i] = static_cast<int>(dataset->features_.size());
dataset->features_.push_back(new Feature(static_cast<int>(i), bin_mapper, dataset->num_data_, io_config_.is_enable_sparse));
} else {
Log::Warning("Ignoring feature %s, only has one value", feature_names_[i].c_str());
delete bin_mapper;
}
}
// free buffer
delete[] start;
delete[] len;
delete[] input_buffer;
delete[] output_buffer;
}
dataset->num_features_ = static_cast<int>(dataset->features_.size());
}
/*! \brief Extract local features from memory */
void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_data, const Parser* parser, Dataset* dataset) {
std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f;
if (predict_fun_ == nullptr) {
// if doesn't need to prediction with initial model
#pragma omp parallel for schedule(guided) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser->ParseOneLine(text_data[i].c_str(), &oneline_features, &tmp_label);
// set label
dataset->metadata_.SetLabelAt(i, static_cast<float>(tmp_label));
// free processed line:
text_data[i].clear();
// shrink_to_fit will be very slow in linux, and seems not free memory, disable for now
// text_reader_->Lines()[i].shrink_to_fit();
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = dataset->used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
dataset->features_[feature_idx]->PushData(tid, i, inner_data.second);
} else {
if (inner_data.first == weight_idx_) {
dataset->metadata_.SetWeightAt(i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
dataset->metadata_.SetQueryAt(i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
} else {
// if need to prediction with initial model
float* init_score = new float[dataset->num_data_ * dataset->num_class_];
#pragma omp parallel for schedule(guided) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser->ParseOneLine(text_data[i].c_str(), &oneline_features, &tmp_label);
// set initial score
std::vector<double> oneline_init_score = predict_fun_(oneline_features);
for (int k = 0; k < dataset->num_class_; ++k) {
init_score[k * dataset->num_data_ + i] = static_cast<float>(oneline_init_score[k]);
}
// set label
dataset->metadata_.SetLabelAt(i, static_cast<float>(tmp_label));
// free processed line:
text_data[i].clear();
// shrink_to_fit will be very slow in linux, and seems not free memory, disable for now
// text_reader_->Lines()[i].shrink_to_fit();
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = dataset->used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
dataset->features_[feature_idx]->PushData(tid, i, inner_data.second);
} else {
if (inner_data.first == weight_idx_) {
dataset->metadata_.SetWeightAt(i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
dataset->metadata_.SetQueryAt(i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
// metadata_ will manage space of init_score
dataset->metadata_.SetInitScore(init_score, dataset->num_data_ * dataset->num_class_);
delete[] init_score;
}
#pragma omp parallel for schedule(guided)
for (int i = 0; i < dataset->num_features_; ++i) {
dataset->features_[i]->FinishLoad();
}
// text data can be free after loaded feature values
text_data.clear();
}
/*! \brief Extract local features from file */
void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser* parser, const std::vector<data_size_t>& used_data_indices, Dataset* dataset) {
float* init_score = nullptr;
if (predict_fun_ != nullptr) {
init_score = new float[dataset->num_data_ * dataset->num_class_];
}
std::function<void(data_size_t, const std::vector<std::string>&)> process_fun =
[this, &init_score, &parser, &dataset]
(data_size_t start_idx, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f;
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
const int tid = omp_get_thread_num();
oneline_features.clear();
// parser
parser->ParseOneLine(lines[i].c_str(), &oneline_features, &tmp_label);
// set initial score
if (init_score != nullptr) {
std::vector<double> oneline_init_score = predict_fun_(oneline_features);
for (int k = 0; k < dataset->num_class_; ++k) {
init_score[k * dataset->num_data_ + start_idx + i] = static_cast<float>(oneline_init_score[k]);
}
}
// set label
dataset->metadata_.SetLabelAt(start_idx + i, static_cast<float>(tmp_label));
// push data
for (auto& inner_data : oneline_features) {
int feature_idx = dataset->used_feature_map_[inner_data.first];
if (feature_idx >= 0) {
// if is used feature
dataset->features_[feature_idx]->PushData(tid, start_idx + i, inner_data.second);
} else {
if (inner_data.first == weight_idx_) {
dataset->metadata_.SetWeightAt(start_idx + i, static_cast<float>(inner_data.second));
} else if (inner_data.first == group_idx_) {
dataset->metadata_.SetQueryAt(start_idx + i, static_cast<data_size_t>(inner_data.second));
}
}
}
}
};
TextReader<data_size_t> text_reader(filename, io_config_.has_header);
if (used_data_indices.size() > 0) {
// only need part of data
text_reader.ReadPartAndProcessParallel(used_data_indices, process_fun);
} else {
// need full data
text_reader.ReadAllAndProcessParallel(process_fun);
}
// metadata_ will manage space of init_score
if (init_score != nullptr) {
dataset->metadata_.SetInitScore(init_score, dataset->num_data_ * dataset->num_class_);
delete[] init_score;
}
#pragma omp parallel for schedule(guided)
for (int i = 0; i < dataset->num_features_; ++i) {
dataset->features_[i]->FinishLoad();
}
}
/*! \brief Check can load from binary file */
bool DatasetLoader::CheckCanLoadFromBin(const char* filename) {
std::string bin_filename(filename);
bin_filename.append(".bin");
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, bin_filename.c_str(), "rb");
#else
file = fopen(bin_filename.c_str(), "rb");
#endif
if (file == NULL) {
return false;
} else {
fclose(file);
return true;
}
}
}

Просмотреть файл

@ -14,9 +14,8 @@ Metadata::Metadata()
}
void Metadata::Init(const char * data_filename, const char* init_score_filename, const int num_class) {
void Metadata::Init(const char * data_filename, const int num_class) {
data_filename_ = data_filename;
init_score_filename_ = init_score_filename;
num_class_ = num_class;
// for lambdarank, it needs query data for partition data in parallel learning
LoadQueryBoundaries();
@ -25,11 +24,6 @@ void Metadata::Init(const char * data_filename, const char* init_score_filename,
LoadInitialScore();
}
void Metadata::Init(const char* init_score_filename, const int num_class) {
init_score_filename_ = init_score_filename;
num_class_ = num_class;
LoadInitialScore();
}
Metadata::~Metadata() {
@ -294,10 +288,14 @@ void Metadata::LoadWeights() {
void Metadata::LoadInitialScore() {
num_init_score_ = 0;
if (init_score_filename_[0] == '\0') { return; }
TextReader<size_t> reader(init_score_filename_, false);
std::string init_score_filename(data_filename_);
// default weight file name
init_score_filename.append(".init");
TextReader<size_t> reader(init_score_filename.c_str(), false);
reader.ReadAllLines();
if (reader.Lines().size() <= 0) {
return;
}
Log::Info("Loading initial scores...");
num_init_score_ = static_cast<data_size_t>(reader.Lines().size());

Просмотреть файл

@ -161,6 +161,7 @@
<ClInclude Include="..\include\LightGBM\config.h" />
<ClInclude Include="..\include\LightGBM\c_api.h" />
<ClInclude Include="..\include\LightGBM\dataset.h" />
<ClInclude Include="..\include\LightGBM\dataset_loader.h" />
<ClInclude Include="..\include\LightGBM\feature.h" />
<ClInclude Include="..\include\LightGBM\meta.h" />
<ClInclude Include="..\include\LightGBM\metric.h" />
@ -208,6 +209,7 @@
<ClCompile Include="..\src\io\bin.cpp" />
<ClCompile Include="..\src\io\config.cpp" />
<ClCompile Include="..\src\io\dataset.cpp" />
<ClCompile Include="..\src\io\dataset_loader.cpp" />
<ClCompile Include="..\src\io\metadata.cpp" />
<ClCompile Include="..\src\io\parser.cpp" />
<ClCompile Include="..\src\io\tree.cpp" />

Просмотреть файл

@ -168,6 +168,9 @@
<ClInclude Include="..\include\LightGBM\c_api.h">
<Filter>include\LightGBM</Filter>
</ClInclude>
<ClInclude Include="..\include\LightGBM\dataset_loader.h">
<Filter>include\LightGBM</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\src\application\application.cpp">
@ -236,5 +239,8 @@
<ClCompile Include="..\src\c_api.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\src\io\dataset_loader.cpp">
<Filter>src\io</Filter>
</ClCompile>
</ItemGroup>
</Project>