feature: Add true streaming APIs to reduce client-side memory usage (#5299)

* Extract streaming to own PR

* small merge fixes and cleanup

* linting fixes

* fix cast warning

* Fix accidental deletion during branch transfer

* responded to initial triage comments

* Added more tests to use create-from-samples APIs

* added mutex and adjusted nclasses logic

* Fix thread-safety for pushing data to sparse bins through Push APIs

* lint and doc fixes

* Small SWIG fix

* nit fix

* Responded to StrikerRUS comments

* fix breaking change after merge with master

* Extract streaming to own PR

* small merge fixes and cleanup

* Fix accidental deletion during branch transfer

* responded to initial triage comments

* Added more tests to use create-from-samples APIs

* Fix rstcheck call in ci

* remove TODOs

* Extract streaming to own PR

* small merge fixes and cleanup

* Fix accidental deletion during branch transfer

* responded to initial triage comments

* Added more tests to use create-from-samples APIs

* Small SWIG fix

* remove ci change

* responded to shiyu1994 comments

* responded to StrikerRUS comments

* Fixes from StrikerRUS comments
This commit is contained in:
Scott Votaw 2022-08-10 01:31:32 -07:00 коммит произвёл GitHub
Родитель 680f4b081e
Коммит 0a5c5838eb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 1416 добавлений и 53 удалений

Просмотреть файл

@ -584,6 +584,10 @@ if(BUILD_CPP_TEST)
FetchContent_MakeAvailable(googletest)
add_library(GTest::GTest ALIAS gtest)
endif()
set(LightGBM_TEST_HEADER_DIR ${PROJECT_SOURCE_DIR}/tests/cpp_tests)
include_directories(${LightGBM_TEST_HEADER_DIR})
file(GLOB CPP_TEST_SOURCES tests/cpp_tests/*.cpp)
if(MSVC)
set(
@ -600,7 +604,7 @@ if(BUILD_CPP_TEST)
endforeach()
endif()
add_executable(testlightgbm ${CPP_TEST_SOURCES})
target_link_libraries(testlightgbm PRIVATE lightgbm_objs GTest::GTest)
target_link_libraries(testlightgbm PRIVATE lightgbm_objs lightgbm_capi_objs GTest::GTest)
endif()
install(

Просмотреть файл

@ -259,8 +259,13 @@ class Bin {
/*! \brief virtual destructor */
virtual ~Bin() {}
/*!
* \brief Initialize for pushing. By default, no action needed.
* \param num_thread The number of external threads that will be calling the push APIs
*/
virtual void InitStreaming(uint32_t /*num_thread*/) { }
/*!
* \brief Push one record
* \pram tid Thread id
* \param tid Thread id
* \param idx Index of record
* \param value bin value of record
*/

Просмотреть файл

@ -145,6 +145,23 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateByReference(const DatasetHandle referenc
int64_t num_total_row,
DatasetHandle* out);
/*!
* \brief Initialize the Dataset for streaming.
* \param dataset Handle of dataset
* \param has_weights Whether the dataset has Metadata weights
* \param has_init_scores Whether the dataset has Metadata initial scores
* \param has_queries Whether the dataset has Metadata queries/groups
* \param nclasses Number of initial score classes
* \param nthreads Number of external threads that will use the PushRows APIs
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetInitStreaming(DatasetHandle dataset,
int32_t has_weights,
int32_t has_init_scores,
int32_t has_queries,
int32_t nclasses,
int32_t nthreads);
/*!
* \brief Push data to existing dataset, if ``nrow + start_row == num_total_row``, will call ``dataset->FinishLoad``.
* \param dataset Handle of dataset
@ -162,6 +179,38 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRows(DatasetHandle dataset,
int32_t ncol,
int32_t start_row);
/*!
* \brief Push data to existing dataset.
* The general flow for a streaming scenario is:
* 1. create Dataset "schema" (e.g. ``LGBM_DatasetCreateFromSampledColumn``)
* 2. init them for thread-safe streaming (``LGBM_DatasetInitStreaming``)
* 3. push data (``LGBM_DatasetPushRowsWithMetadata`` or ``LGBM_DatasetPushRowsByCSRWithMetadata``)
* 4. call ``LGBM_DatasetMarkFinished``
* \param dataset Handle of dataset
* \param data Pointer to the data space
* \param data_type Type of ``data`` pointer, can be ``C_API_DTYPE_FLOAT32`` or ``C_API_DTYPE_FLOAT64``
* \param nrow Number of rows
* \param ncol Number of feature columns
* \param start_row Row start index, i.e., the index at which to start inserting data
* \param label Pointer to array with nrow labels
* \param weight Optional pointer to array with nrow weights
* \param init_score Optional pointer to array with nrow*nclasses initial scores, in column format
* \param query Optional pointer to array with nrow query values
* \param tid The id of the calling thread, from 0...N-1 threads
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset,
const void* data,
int data_type,
int32_t nrow,
int32_t ncol,
int32_t start_row,
const float* label,
const float* weight,
const double* init_score,
const int32_t* query,
int32_t tid);
/*!
* \brief Push data to existing dataset, if ``nrow + start_row == num_total_row``, will call ``dataset->FinishLoad``.
* \param dataset Handle of dataset
@ -187,6 +236,55 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset,
int64_t num_col,
int64_t start_row);
/*!
* \brief Push CSR data to existing dataset. (See ``LGBM_DatasetPushRowsWithMetadata`` for more details.)
* \param dataset Handle of dataset
* \param indptr Pointer to row headers
* \param indptr_type Type of ``indptr``, can be ``C_API_DTYPE_INT32`` or ``C_API_DTYPE_INT64``
* \param indices Pointer to column indices
* \param data Pointer to the data space
* \param data_type Type of ``data`` pointer, can be ``C_API_DTYPE_FLOAT32`` or ``C_API_DTYPE_FLOAT64``
* \param nindptr Number of rows in the matrix + 1
* \param nelem Number of nonzero elements in the matrix
* \param start_row Row start index
* \param label Pointer to array with nindptr-1 labels
* \param weight Optional pointer to array with nindptr-1 weights
* \param init_score Optional pointer to array with (nindptr-1)*nclasses initial scores, in column format
* \param query Optional pointer to array with nindptr-1 query values
* \param tid The id of the calling thread, from 0...N-1 threads
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t start_row,
const float* label,
const float* weight,
const double* init_score,
const int32_t* query,
int32_t tid);
/*!
* \brief Set whether or not the Dataset waits for a manual MarkFinished call or calls FinishLoad on itself automatically.
* Set to 1 for streaming scenario, and use ``LGBM_DatasetMarkFinished`` to manually finish the Dataset.
* \param dataset Handle of dataset
* \param wait Whether to wait or not (1 or 0)
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetSetWaitForManualFinish(DatasetHandle dataset, int wait);
/*!
* \brief Mark the Dataset as complete by calling ``dataset->FinishLoad``.
* \param dataset Handle of dataset
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetMarkFinished(DatasetHandle dataset);
/*!
* \brief Create a dataset from CSR format.
* \param indptr Pointer to row headers

Просмотреть файл

@ -69,13 +69,30 @@ class Metadata {
~Metadata();
/*!
* \brief Initial work, will allocate space for label, weight(if exists) and query(if exists)
* \brief Initial work, will allocate space for label, weight (if exists) and query (if exists)
* \param num_data Number of training data
* \param weight_idx Index of weight column, < 0 means doesn't exists
* \param query_idx Index of query id column, < 0 means doesn't exists
*/
void Init(data_size_t num_data, int weight_idx, int query_idx);
/*!
* \brief Allocate space for label, weight (if exists), initial score (if exists) and query (if exists)
* \param num_data Number of data
* \param reference Reference metadata
*/
void InitByReference(data_size_t num_data, const Metadata* reference);
/*!
* \brief Allocate space for label, weight (if exists), initial score (if exists) and query (if exists)
* \param num_data Number of data rows
* \param has_weights Whether the metadata has weights
* \param has_init_scores Whether the metadata has initial scores
* \param has_queries Whether the metadata has queries
* \param nclasses Number of classes for initial scores
*/
void Init(data_size_t num_data, int32_t has_weights, int32_t has_init_scores, int32_t has_queries, int32_t nclasses);
/*!
* \brief Partition label by used indices
* \param used_indices Indices of local used
@ -138,6 +155,19 @@ class Metadata {
weights_[idx] = value;
}
/*!
* \brief Set initial scores for one record. Note that init_score might have multiple columns and is stored in column format.
* \param idx Index of this record
* \param values Initial score values for this record, one per class
*/
inline void SetInitScoreAt(data_size_t idx, const double* values) {
const auto nclasses = num_classes();
const double* val_ptr = values;
for (int i = idx; i < nclasses * num_data_; i += num_data_, ++val_ptr) {
init_score_[i] = *val_ptr;
}
}
/*!
* \brief Set Query Id for one record
* \param idx Index of this record
@ -150,6 +180,26 @@ class Metadata {
/*! \brief Load initial scores from file */
void LoadInitialScore(const std::string& data_filename);
/*!
* \brief Insert data from a given data to the current data at a specified index
* \param start_index The target index to begin the insertion
* \param count Number of records to insert
* \param labels Pointer to label data
* \param weights Pointer to weight data, or null
* \param init_scores Pointer to init-score data, or null
* \param queries Pointer to query data, or null
*/
void InsertAt(data_size_t start_index,
data_size_t count,
const float* labels,
const float* weights,
const double* init_scores,
const int32_t* queries);
/*!
* \brief Perform any extra operations after all data has been loaded
*/
void FinishLoad();
/*!
* \brief Get weights, if not exists, will return nullptr
* \return Pointer of weights
@ -212,6 +262,16 @@ class Metadata {
*/
inline int64_t num_init_score() const { return num_init_score_; }
/*!
* \brief Get number of classes
*/
inline int32_t num_classes() const {
if (num_data_ && num_init_score_) {
return static_cast<int>(num_init_score_ / num_data_);
}
return 1;
}
/*! \brief Disable copy */
Metadata& operator=(const Metadata&) = delete;
/*! \brief Disable copy */
@ -230,8 +290,18 @@ class Metadata {
void LoadWeights();
/*! \brief Load query boundaries from file */
void LoadQueryBoundaries();
/*! \brief Load query wights */
void LoadQueryWeights();
/*! \brief Calculate query weights from queries */
void CalculateQueryWeights();
/*! \brief Calculate query boundaries from queries */
void CalculateQueryBoundaries();
/*! \brief Insert labels at the given index */
void InsertLabels(const label_t* labels, data_size_t start_index, data_size_t len);
/*! \brief Insert weights at the given index */
void InsertWeights(const label_t* weights, data_size_t start_index, data_size_t len);
/*! \brief Insert initial scores at the given index */
void InsertInitScores(const double* init_scores, data_size_t start_index, data_size_t len, data_size_t source_size);
/*! \brief Insert queries at the given index */
void InsertQueries(const data_size_t* queries, data_size_t start_index, data_size_t len);
/*! \brief Filename of current data */
std::string data_filename_;
/*! \brief Number of data */
@ -374,6 +444,27 @@ class Dataset {
/*! \brief Destructor */
LIGHTGBM_EXPORT ~Dataset();
/*!
* \brief Initialize from the given reference
* \param num_data Number of data
* \param reference Reference dataset
*/
LIGHTGBM_EXPORT void InitByReference(data_size_t num_data, const Dataset* reference) {
metadata_.InitByReference(num_data, &reference->metadata());
}
LIGHTGBM_EXPORT void InitStreaming(data_size_t num_data,
int32_t has_weights,
int32_t has_init_scores,
int32_t has_queries,
int32_t nclasses,
int32_t nthreads) {
metadata_.Init(num_data, has_weights, has_init_scores, has_queries, nclasses);
for (int i = 0; i < num_groups_; ++i) {
feature_groups_[i]->InitStreaming(nthreads);
}
}
LIGHTGBM_EXPORT bool CheckAlign(const Dataset& other) const {
if (num_features_ != other.num_features_) {
return false;
@ -452,6 +543,15 @@ class Dataset {
}
}
inline void InsertMetadataAt(data_size_t start_index,
data_size_t count,
const label_t* labels,
const label_t* weights,
const double* init_scores,
const data_size_t* queries) {
metadata_.InsertAt(start_index, count, labels, weights, init_scores, queries);
}
inline int RealFeatureIndex(int fidx) const {
return real_feature_idx_[fidx];
}
@ -743,6 +843,18 @@ class Dataset {
/*! \brief Get Number of data */
inline data_size_t num_data() const { return num_data_; }
/*! \brief Get whether FinishLoad is automatically called when pushing last row. */
inline bool wait_for_manual_finish() const { return wait_for_manual_finish_; }
/*! \brief Set whether the Dataset is finished automatically when last row is pushed or with a manual
* MarkFinished API call. Set to true for thread-safe streaming and/or if will be coalesced later.
* FinishLoad should not be called on any Dataset that will be coalesced.
*/
inline void set_wait_for_manual_finish(bool value) {
std::lock_guard<std::mutex> lock(mutex_);
wait_for_manual_finish_ = value;
}
/*! \brief Disable copy */
Dataset& operator=(const Dataset&) = delete;
/*! \brief Disable copy */
@ -834,12 +946,15 @@ class Dataset {
bool zero_as_missing_;
std::vector<int> feature_need_push_zeros_;
std::vector<std::vector<float>> raw_data_;
bool wait_for_manual_finish_;
bool has_raw_;
/*! map feature (inner index) to its index in the list of numeric (non-categorical) features */
std::vector<int> numeric_feature_map_;
int num_numeric_features_;
std::string device_type_;
int gpu_device_id_;
/*! \brief mutex for threading safe call */
std::mutex mutex_;
#ifdef USE_CUDA_EXP
std::unique_ptr<CUDAColumnData> cuda_column_data_;

Просмотреть файл

@ -189,14 +189,28 @@ class FeatureGroup {
/*! \brief Destructor */
~FeatureGroup() {}
/*!
* \brief Initialize for pushing in a streaming fashion. By default, no action needed.
* \param num_thread The number of external threads that will be calling the push APIs
*/
void InitStreaming(int32_t num_thread) {
if (is_multi_val_) {
for (int i = 0; i < num_feature_; ++i) {
multi_bin_data_[i]->InitStreaming(num_thread);
}
} else {
bin_data_->InitStreaming(num_thread);
}
}
/*!
* \brief Push one record, will auto convert to bin and push to bin data
* \param tid Thread id
* \param idx Index of record
* \param sub_feature_idx Index of the subfeature
* \param line_idx Index of record
* \param value feature value of record
*/
inline void PushData(int tid, int sub_feature_idx, data_size_t line_idx,
double value) {
inline void PushData(int tid, int sub_feature_idx, data_size_t line_idx, double value) {
uint32_t bin = bin_mappers_[sub_feature_idx]->ValueToBin(value);
if (bin == bin_mappers_[sub_feature_idx]->GetMostFreqBin()) {
return;

Просмотреть файл

@ -1004,12 +1004,29 @@ int LGBM_DatasetCreateByReference(const DatasetHandle reference,
DatasetHandle* out) {
API_BEGIN();
std::unique_ptr<Dataset> ret;
ret.reset(new Dataset(static_cast<data_size_t>(num_total_row)));
ret->CreateValid(reinterpret_cast<const Dataset*>(reference));
data_size_t nrows = static_cast<data_size_t>(num_total_row);
ret.reset(new Dataset(nrows));
const Dataset* reference_dataset = reinterpret_cast<const Dataset*>(reference);
ret->CreateValid(reference_dataset);
ret->InitByReference(nrows, reference_dataset);
*out = ret.release();
API_END();
}
int LGBM_DatasetInitStreaming(DatasetHandle dataset,
int32_t has_weights,
int32_t has_init_scores,
int32_t has_queries,
int32_t nclasses,
int32_t nthreads) {
API_BEGIN();
auto p_dataset = reinterpret_cast<Dataset*>(dataset);
auto num_data = p_dataset->num_data();
p_dataset->InitStreaming(num_data, has_weights, has_init_scores, has_queries, nclasses, nthreads);
p_dataset->set_wait_for_manual_finish(true);
API_END();
}
int LGBM_DatasetPushRows(DatasetHandle dataset,
const void* data,
int data_type,
@ -1032,7 +1049,52 @@ int LGBM_DatasetPushRows(DatasetHandle dataset,
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
if (start_row + nrow == p_dataset->num_data()) {
if (!p_dataset->wait_for_manual_finish() && (start_row + nrow == p_dataset->num_data())) {
p_dataset->FinishLoad();
}
API_END();
}
int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset,
const void* data,
int data_type,
int32_t nrow,
int32_t ncol,
int32_t start_row,
const float* labels,
const float* weights,
const double* init_scores,
const int32_t* queries,
int32_t tid) {
API_BEGIN();
#ifdef LABEL_T_USE_DOUBLE
Log::Fatal("Don't support LABEL_T_USE_DOUBLE");
#endif
if (!data) {
Log::Fatal("data cannot be null.");
}
const int num_omp_threads = OMP_NUM_THREADS();
auto p_dataset = reinterpret_cast<Dataset*>(dataset);
auto get_row_fun = RowFunctionFromDenseMatric(data, nrow, ncol, data_type, 1);
if (p_dataset->has_raw()) {
p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow);
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
// convert internal thread id to be unique based on external thread id
const int internal_tid = omp_get_thread_num() + (num_omp_threads * tid);
auto one_row = get_row_fun(i);
p_dataset->PushOneRow(internal_tid, start_row + i, one_row);
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
p_dataset->InsertMetadataAt(start_row, nrow, labels, weights, init_scores, queries);
if (!p_dataset->wait_for_manual_finish() && (start_row + nrow == p_dataset->num_data())) {
p_dataset->FinishLoad();
}
API_END();
@ -1065,12 +1127,74 @@ int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset,
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
if (start_row + nrow == static_cast<int64_t>(p_dataset->num_data())) {
if (!p_dataset->wait_for_manual_finish() && (start_row + nrow == static_cast<int64_t>(p_dataset->num_data()))) {
p_dataset->FinishLoad();
}
API_END();
}
int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t start_row,
const float* labels,
const float* weights,
const double* init_scores,
const int32_t* queries,
int32_t tid) {
API_BEGIN();
#ifdef LABEL_T_USE_DOUBLE
Log::Fatal("Don't support LABEL_T_USE_DOUBLE");
#endif
if (!data) {
Log::Fatal("data cannot be null.");
}
const int num_omp_threads = OMP_NUM_THREADS();
auto p_dataset = reinterpret_cast<Dataset*>(dataset);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
int32_t nrow = static_cast<int32_t>(nindptr - 1);
if (p_dataset->has_raw()) {
p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow);
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
// convert internal thread id to be unique based on external thread id
const int internal_tid = omp_get_thread_num() + (num_omp_threads * tid);
auto one_row = get_row_fun(i);
p_dataset->PushOneRow(internal_tid, static_cast<data_size_t>(start_row + i), one_row);
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
p_dataset->InsertMetadataAt(static_cast<int32_t>(start_row), nrow, labels, weights, init_scores, queries);
if (!p_dataset->wait_for_manual_finish() && (start_row + nrow == static_cast<int64_t>(p_dataset->num_data()))) {
p_dataset->FinishLoad();
}
API_END();
}
int LGBM_DatasetSetWaitForManualFinish(DatasetHandle dataset, int wait) {
API_BEGIN();
auto p_dataset = reinterpret_cast<Dataset*>(dataset);
p_dataset->set_wait_for_manual_finish(wait);
API_END();
}
int LGBM_DatasetMarkFinished(DatasetHandle dataset) {
API_BEGIN();
auto p_dataset = reinterpret_cast<Dataset*>(dataset);
p_dataset->FinishLoad();
API_END();
}
int LGBM_DatasetCreateFromMat(const void* data,
int data_type,
int32_t nrow,

Просмотреть файл

@ -26,6 +26,7 @@ Dataset::Dataset() {
data_filename_ = "noname";
num_data_ = 0;
is_finish_load_ = false;
wait_for_manual_finish_ = false;
has_raw_ = false;
}
@ -35,13 +36,14 @@ Dataset::Dataset(data_size_t num_data) {
num_data_ = num_data;
metadata_.Init(num_data_, NO_SPECIFIC, NO_SPECIFIC);
is_finish_load_ = false;
wait_for_manual_finish_ = false;
group_bin_boundaries_.push_back(0);
has_raw_ = false;
}
Dataset::~Dataset() {}
std::vector<std::vector<int>> NoGroup(const std::vector<int>& used_features) {
std::vector<std::vector<int>> OneFeaturePerGroup(const std::vector<int>& used_features) {
std::vector<std::vector<int>> features_in_group;
features_in_group.resize(used_features.size());
for (size_t i = 0; i < used_features.size(); ++i) {
@ -318,9 +320,12 @@ std::vector<std::vector<int>> FastFeatureBundling(
void Dataset::Construct(std::vector<std::unique_ptr<BinMapper>>* bin_mappers,
int num_total_features,
const std::vector<std::vector<double>>& forced_bins,
int** sample_non_zero_indices, double** sample_values,
const int* num_per_col, int num_sample_col,
size_t total_sample_cnt, const Config& io_config) {
int** sample_non_zero_indices,
double** sample_values,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const Config& io_config) {
num_total_features_ = num_total_features;
CHECK_EQ(num_total_features_, static_cast<int>(bin_mappers->size()));
// get num_features
@ -337,7 +342,7 @@ void Dataset::Construct(std::vector<std::unique_ptr<BinMapper>>* bin_mappers,
"Decreasing Dataset parameters min_data_in_bin or min_data_in_leaf and re-constructing "
"Dataset might resolve this warning.");
}
auto features_in_group = NoGroup(used_features);
auto features_in_group = OneFeaturePerGroup(used_features);
auto is_sparse = io_config.is_enable_sparse;
if (io_config.device_type == std::string("cuda") || io_config.device_type == std::string("cuda_exp")) {
@ -440,6 +445,8 @@ void Dataset::FinishLoad() {
feature_groups_[i]->FinishLoad();
}
}
metadata_.FinishLoad();
#ifdef USE_CUDA_EXP
if (device_type_ == std::string("cuda_exp")) {
CreateCUDAColumnData();

Просмотреть файл

@ -28,7 +28,7 @@ void Metadata::Init(const char* data_filename) {
// for lambdarank, it needs query data for partition data in distributed learning
LoadQueryBoundaries();
LoadWeights();
LoadQueryWeights();
CalculateQueryWeights();
LoadInitialScore(data_filename_);
}
@ -58,6 +58,41 @@ void Metadata::Init(data_size_t num_data, int weight_idx, int query_idx) {
}
}
void Metadata::InitByReference(data_size_t num_data, const Metadata* reference) {
int has_weights = reference->num_weights_ > 0;
int has_init_scores = reference->num_init_score_ > 0;
int has_queries = reference->num_queries_ > 0;
int nclasses = reference->num_classes();
Init(num_data, has_weights, has_init_scores, has_queries, nclasses);
}
void Metadata::Init(data_size_t num_data, int32_t has_weights, int32_t has_init_scores, int32_t has_queries, int32_t nclasses) {
num_data_ = num_data;
label_ = std::vector<label_t>(num_data_);
if (has_weights) {
if (!weights_.empty()) {
Log::Fatal("Calling Init() on Metadata weights that have already been initialized");
}
weights_.resize(num_data_, 0.0f);
num_weights_ = num_data_;
weight_load_from_file_ = false;
}
if (has_init_scores) {
if (!init_score_.empty()) {
Log::Fatal("Calling Init() on Metadata initial scores that have already been initialized");
}
num_init_score_ = static_cast<int64_t>(num_data) * nclasses;
init_score_.resize(num_init_score_, 0);
}
if (has_queries) {
if (!query_weights_.empty()) {
Log::Fatal("Calling Init() on Metadata queries that have already been initialized");
}
queries_.resize(num_data_, 0);
query_load_from_file_ = false;
}
}
void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, data_size_t num_used_indices) {
num_data_ = num_used_indices;
@ -141,33 +176,37 @@ void Metadata::PartitionLabel(const std::vector<data_size_t>& used_indices) {
old_label.clear();
}
void Metadata::CalculateQueryBoundaries() {
if (!queries_.empty()) {
// need convert query_id to boundaries
std::vector<data_size_t> tmp_buffer;
data_size_t last_qid = -1;
data_size_t cur_cnt = 0;
for (data_size_t i = 0; i < num_data_; ++i) {
if (last_qid != queries_[i]) {
if (cur_cnt > 0) {
tmp_buffer.push_back(cur_cnt);
}
cur_cnt = 0;
last_qid = queries_[i];
}
++cur_cnt;
}
tmp_buffer.push_back(cur_cnt);
query_boundaries_ = std::vector<data_size_t>(tmp_buffer.size() + 1);
num_queries_ = static_cast<data_size_t>(tmp_buffer.size());
query_boundaries_[0] = 0;
for (size_t i = 0; i < tmp_buffer.size(); ++i) {
query_boundaries_[i + 1] = query_boundaries_[i] + tmp_buffer[i];
}
CalculateQueryWeights();
queries_.clear();
}
}
void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data_size_t>& used_data_indices) {
if (used_data_indices.empty()) {
if (!queries_.empty()) {
// need convert query_id to boundaries
std::vector<data_size_t> tmp_buffer;
data_size_t last_qid = -1;
data_size_t cur_cnt = 0;
for (data_size_t i = 0; i < num_data_; ++i) {
if (last_qid != queries_[i]) {
if (cur_cnt > 0) {
tmp_buffer.push_back(cur_cnt);
}
cur_cnt = 0;
last_qid = queries_[i];
}
++cur_cnt;
}
tmp_buffer.push_back(cur_cnt);
query_boundaries_ = std::vector<data_size_t>(tmp_buffer.size() + 1);
num_queries_ = static_cast<data_size_t>(tmp_buffer.size());
query_boundaries_[0] = 0;
for (size_t i = 0; i < tmp_buffer.size(); ++i) {
query_boundaries_[i + 1] = query_boundaries_[i] + tmp_buffer[i];
}
LoadQueryWeights();
queries_.clear();
}
CalculateQueryBoundaries();
// check weights
if (!weights_.empty() && num_weights_ != num_data_) {
weights_.clear();
@ -277,8 +316,8 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
old_scores.clear();
}
}
// re-load query weight
LoadQueryWeights();
// re-calculate query weight
CalculateQueryWeights();
}
if (num_queries_ > 0) {
Log::Debug("Number of queries in %s: %i. Average number of rows per query: %f.",
@ -312,6 +351,28 @@ void Metadata::SetInitScore(const double* init_score, data_size_t len) {
#endif // USE_CUDA_EXP
}
void Metadata::InsertInitScores(const double* init_scores, data_size_t start_index, data_size_t len, data_size_t source_size) {
if (num_init_score_ <= 0) {
Log::Fatal("Inserting initial score data into dataset with no initial scores");
}
if (start_index + len > num_data_) {
// Note that len here is row count, not num_init_score, so we compare against num_data
Log::Fatal("Inserted initial score data is too large for dataset");
}
if (init_score_.empty()) { init_score_.resize(num_init_score_); }
int nclasses = num_classes();
for (int32_t col = 0; col < nclasses; ++col) {
int32_t dest_offset = num_data_ * col + start_index;
// We need to use source_size here, because len might not equal size (due to a partially loaded dataset)
int32_t source_offset = source_size * col;
memcpy(init_score_.data() + dest_offset, init_scores + source_offset, sizeof(double) * len);
}
init_score_load_from_file_ = false;
// CUDA is handled after all insertions are complete
}
void Metadata::SetLabel(const label_t* label, data_size_t len) {
std::lock_guard<std::mutex> lock(mutex_);
if (label == nullptr) {
@ -333,6 +394,20 @@ void Metadata::SetLabel(const label_t* label, data_size_t len) {
#endif // USE_CUDA_EXP
}
void Metadata::InsertLabels(const label_t* labels, data_size_t start_index, data_size_t len) {
if (labels == nullptr) {
Log::Fatal("label cannot be nullptr");
}
if (start_index + len > num_data_) {
Log::Fatal("Inserted label data is too large for dataset");
}
if (label_.empty()) { label_.resize(num_data_); }
memcpy(label_.data() + start_index, labels, sizeof(label_t) * len);
// CUDA is handled after all insertions are complete
}
void Metadata::SetWeights(const label_t* weights, data_size_t len) {
std::lock_guard<std::mutex> lock(mutex_);
// save to nullptr
@ -351,7 +426,7 @@ void Metadata::SetWeights(const label_t* weights, data_size_t len) {
for (data_size_t i = 0; i < num_weights_; ++i) {
weights_[i] = Common::AvoidInf(weights[i]);
}
LoadQueryWeights();
CalculateQueryWeights();
weight_load_from_file_ = false;
#ifdef USE_CUDA_EXP
if (cuda_metadata_ != nullptr) {
@ -360,6 +435,24 @@ void Metadata::SetWeights(const label_t* weights, data_size_t len) {
#endif // USE_CUDA_EXP
}
void Metadata::InsertWeights(const label_t* weights, data_size_t start_index, data_size_t len) {
if (!weights) {
Log::Fatal("Passed null weights");
}
if (num_weights_ <= 0) {
Log::Fatal("Inserting weight data into dataset with no weights");
}
if (start_index + len > num_weights_) {
Log::Fatal("Inserted weight data is too large for dataset");
}
if (weights_.empty()) { weights_.resize(num_weights_); }
memcpy(weights_.data() + start_index, weights, sizeof(label_t) * len);
weight_load_from_file_ = false;
// CUDA is handled after all insertions are complete
}
void Metadata::SetQuery(const data_size_t* query, data_size_t len) {
std::lock_guard<std::mutex> lock(mutex_);
// save to nullptr
@ -382,7 +475,7 @@ void Metadata::SetQuery(const data_size_t* query, data_size_t len) {
for (data_size_t i = 0; i < num_queries_; ++i) {
query_boundaries_[i + 1] = query_boundaries_[i] + query[i];
}
LoadQueryWeights();
CalculateQueryWeights();
query_load_from_file_ = false;
#ifdef USE_CUDA_EXP
if (cuda_metadata_ != nullptr) {
@ -396,6 +489,23 @@ void Metadata::SetQuery(const data_size_t* query, data_size_t len) {
#endif // USE_CUDA_EXP
}
void Metadata::InsertQueries(const data_size_t* queries, data_size_t start_index, data_size_t len) {
if (!queries) {
Log::Fatal("Passed null queries");
}
if (queries_.size() <= 0) {
Log::Fatal("Inserting query data into dataset with no queries");
}
if (static_cast<size_t>(start_index + len) > queries_.size()) {
Log::Fatal("Inserted query data is too large for dataset");
}
memcpy(queries_.data() + start_index, queries, sizeof(data_size_t) * len);
query_load_from_file_ = false;
// CUDA is handled after all insertions are complete
}
void Metadata::LoadWeights() {
num_weights_ = 0;
std::string weight_filename(data_filename_);
@ -472,7 +582,7 @@ void Metadata::LoadQueryBoundaries() {
if (reader.Lines().empty()) {
return;
}
Log::Info("Loading query boundaries...");
Log::Info("Calculating query boundaries...");
query_boundaries_ = std::vector<data_size_t>(reader.Lines().size() + 1);
num_queries_ = static_cast<data_size_t>(reader.Lines().size());
query_boundaries_[0] = 0;
@ -484,12 +594,12 @@ void Metadata::LoadQueryBoundaries() {
query_load_from_file_ = true;
}
void Metadata::LoadQueryWeights() {
void Metadata::CalculateQueryWeights() {
if (weights_.size() == 0 || query_boundaries_.size() == 0) {
return;
}
query_weights_.clear();
Log::Info("Loading query weights...");
Log::Info("Calculating query weights...");
query_weights_ = std::vector<label_t>(num_queries_);
for (data_size_t i = 0; i < num_queries_; ++i) {
query_weights_[i] = 0.0f;
@ -500,6 +610,31 @@ void Metadata::LoadQueryWeights() {
}
}
void Metadata::InsertAt(data_size_t start_index,
data_size_t count,
const float* labels,
const float* weights,
const double* init_scores,
const int32_t* queries) {
if (num_data_ < count + start_index) {
Log::Fatal("Length of metadata is too long to append #data");
}
InsertLabels(labels, start_index, count);
if (weights) {
InsertWeights(weights, start_index, count);
}
if (init_scores) {
InsertInitScores(init_scores, start_index, count, count);
}
if (queries) {
InsertQueries(queries, start_index, count);
}
}
void Metadata::FinishLoad() {
CalculateQueryBoundaries();
}
#ifdef USE_CUDA_EXP
void Metadata::CreateCUDAMetadata(const int gpu_device_id) {
cuda_metadata_.reset(new CUDAMetadata(gpu_device_id));
@ -537,7 +672,7 @@ void Metadata::LoadFromMemory(const void* memory) {
(num_queries_ + 1));
query_load_from_file_ = true;
}
LoadQueryWeights();
CalculateQueryWeights();
}
void Metadata::SaveBinaryToFile(const VirtualFileWriter* writer) const {

Просмотреть файл

@ -81,6 +81,12 @@ class SparseBin : public Bin {
~SparseBin() {}
void InitStreaming(uint32_t num_thread) override {
// Each thread needs its own push buffer, so allocate external num_thread times the number of OMP threads
int num_omp_threads = OMP_NUM_THREADS();
push_buffers_.resize(num_omp_threads * num_thread);
};
void ReSize(data_size_t num_data) override { num_data_ = num_data; }
void Push(int tid, data_size_t idx, uint32_t value) override {

Просмотреть файл

@ -36,6 +36,8 @@
%pointer_cast(int32_t *, void *, int32_t_to_voidp_ptr)
%pointer_cast(int64_t *, void *, int64_t_to_voidp_ptr)
%pointer_cast(void *, double **, void_to_doublep_ptr)
/* Custom pointer manipulation template */
%define %pointer_manipulation(TYPE, NAME)
%{
@ -99,12 +101,43 @@ void delete_##NAME(TYPE *ary);
TYPE NAME##_getitem(TYPE *ary, int64_t index);
void NAME##_setitem(TYPE *ary, int64_t index, TYPE value);
%enddef
/* Custom template for arrays of pointers */
%define %ptr_array_functions(TYPE,NAME)
%{
static TYPE **new_##NAME(int64_t nelements) { %}
%{ return new TYPE*[nelements](); %}
%{}
static void delete_##NAME(TYPE **ary) { %}
%{ delete [] ary; %}
%{}
static TYPE *NAME##_getitem(TYPE **ary, int64_t index) {
return ary[index];
}
static void NAME##_setitem(TYPE **ary, int64_t index, TYPE *value) {
ary[index] = value;
}
%}
TYPE **new_##NAME(int64_t nelements);
void delete_##NAME(TYPE **ary);
TYPE *NAME##_getitem(TYPE **ary, int64_t index);
void NAME##_setitem(TYPE **ary, int64_t index, TYPE *value);
%enddef
%long_array_functions(uint8_t, byteArray)
%long_array_functions(double, doubleArray)
%long_array_functions(float, floatArray)
%long_array_functions(int, intArray)
%long_array_functions(long, longArray)
%long_array_functions(int32_t, intArray)
%long_array_functions(int64_t, longArray)
%ptr_array_functions(void, voidPtrArray)
%ptr_array_functions(double, doublePtrArray)
%ptr_array_functions(int, intPtrArray)
%pointer_manipulation(void*, voidpp)

Просмотреть файл

@ -0,0 +1,335 @@
/*!
* Copyright (c) 2022 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
*/
#include <gtest/gtest.h>
#include <testutils.h>
#include <LightGBM/utils/log.h>
#include <LightGBM/c_api.h>
#include <LightGBM/dataset.h>
#include <iostream>
using LightGBM::Dataset;
using LightGBM::Log;
using LightGBM::TestUtils;
void test_stream_dense(
int8_t creation_type,
DatasetHandle ref_datset_handle,
int32_t nrows,
int32_t ncols,
int32_t nclasses,
int batch_count,
const std::vector<double>* features,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
Log::Info("Streaming %d rows dense data with a batch size of %d", nrows, batch_count);
DatasetHandle dataset_handle = nullptr;
Dataset* dataset = nullptr;
int has_weights = weights != nullptr;
int has_init_scores = init_scores != nullptr;
int has_queries = groups != nullptr;
try {
int result = 0;
switch (creation_type) {
case 0: {
Log::Info("Creating Dataset using LGBM_DatasetCreateFromSampledColumn, %d rows dense data with a batch size of %d", nrows, batch_count);
// construct sample data first (use all data for convenience and since size is small)
std::vector<std::vector<double>> sample_values(ncols);
std::vector<std::vector<int>> sample_idx(ncols);
const double* current_val = features->data();
for (int32_t idx = 0; idx < nrows; ++idx) {
for (int32_t k = 0; k < ncols; ++k) {
if (std::fabs(*current_val) > 1e-35f || std::isnan(*current_val)) {
sample_values[k].emplace_back(*current_val);
sample_idx[k].emplace_back(static_cast<int>(idx));
}
current_val++;
}
}
std::vector<int> sample_sizes;
std::vector<double*> sample_values_ptrs;
std::vector<int*> sample_idx_ptrs;
for (int32_t i = 0; i < ncols; ++i) {
sample_values_ptrs.push_back(sample_values[i].data());
sample_idx_ptrs.push_back(sample_idx[i].data());
sample_sizes.push_back(static_cast<int>(sample_values[i].size()));
}
result = LGBM_DatasetCreateFromSampledColumn(
sample_values_ptrs.data(),
sample_idx_ptrs.data(),
ncols,
sample_sizes.data(),
nrows,
nrows,
nrows,
"max_bin=15",
&dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetCreateFromSampledColumn result code: " << result;
result = LGBM_DatasetInitStreaming(dataset_handle, has_weights, has_init_scores, has_queries, nclasses, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetInitStreaming result code: " << result;
break;
}
case 1:
Log::Info("Creating Dataset using LGBM_DatasetCreateByReference, %d rows dense data with a batch size of %d", nrows, batch_count);
result = LGBM_DatasetCreateByReference(ref_datset_handle, nrows, &dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetCreateByReference result code: " << result;
break;
}
dataset = static_cast<Dataset*>(dataset_handle);
Log::Info("Streaming dense dataset, %d rows dense data with a batch size of %d", nrows, batch_count);
TestUtils::StreamDenseDataset(
dataset_handle,
nrows,
ncols,
nclasses,
batch_count,
features,
labels,
weights,
init_scores,
groups);
dataset->FinishLoad();
TestUtils::AssertMetadata(&dataset->metadata(),
labels,
weights,
init_scores,
groups);
}
catch (...) {
}
if (dataset_handle) {
int result = LGBM_DatasetFree(dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetFree result code: " << result;
}
}
void test_stream_sparse(
int8_t creation_type,
DatasetHandle ref_datset_handle,
int32_t nrows,
int32_t ncols,
int32_t nclasses,
int batch_count,
const std::vector<int32_t>* indptr,
const std::vector<int32_t>* indices,
const std::vector<double>* vals,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
Log::Info("Streaming %d rows sparse data with a batch size of %d", nrows, batch_count);
DatasetHandle dataset_handle = nullptr;
Dataset* dataset = nullptr;
int has_weights = weights != nullptr;
int has_init_scores = init_scores != nullptr;
int has_queries = groups != nullptr;
try {
int result = 0;
switch (creation_type) {
case 0: {
Log::Info("Creating Dataset using LGBM_DatasetCreateFromSampledColumn, %d rows sparse data with a batch size of %d", nrows, batch_count);
std::vector<std::vector<double>> sample_values(ncols);
std::vector<std::vector<int>> sample_idx(ncols);
for (size_t i = 0; i < indptr->size() - 1; ++i) {
int start_index = indptr->at(i);
int stop_index = indptr->at(i + 1);
for (int32_t j = start_index; j < stop_index; ++j) {
auto val = vals->at(j);
auto idx = indices->at(j);
if (std::fabs(val) > 1e-35f || std::isnan(val)) {
sample_values[idx].emplace_back(val);
sample_idx[idx].emplace_back(static_cast<int>(i));
}
}
}
std::vector<int> sample_sizes;
std::vector<double*> sample_values_ptrs;
std::vector<int*> sample_idx_ptrs;
for (int32_t i = 0; i < ncols; ++i) {
sample_values_ptrs.push_back(sample_values[i].data());
sample_idx_ptrs.push_back(sample_idx[i].data());
sample_sizes.push_back(static_cast<int>(sample_values[i].size()));
}
result = LGBM_DatasetCreateFromSampledColumn(
sample_values_ptrs.data(),
sample_idx_ptrs.data(),
ncols,
sample_sizes.data(),
nrows,
nrows,
nrows,
"max_bin=15",
&dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetCreateFromSampledColumn result code: " << result;
dataset = static_cast<Dataset*>(dataset_handle);
dataset->InitStreaming(nrows, has_weights, has_init_scores, has_queries, nclasses, 1);
break;
}
case 1:
Log::Info("Creating Dataset using LGBM_DatasetCreateByReference, %d rows sparse data with a batch size of %d", nrows, batch_count);
result = LGBM_DatasetCreateByReference(ref_datset_handle, nrows, &dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetCreateByReference result code: " << result;
break;
}
dataset = static_cast<Dataset*>(dataset_handle);
TestUtils::StreamSparseDataset(
dataset_handle,
nrows,
nclasses,
batch_count,
indptr,
indices,
vals,
labels,
weights,
init_scores,
groups);
dataset->FinishLoad();
Log::Info("Streaming sparse dataset, %d rows sparse data with a batch size of %d", nrows, batch_count);
TestUtils::AssertMetadata(&dataset->metadata(),
labels,
weights,
init_scores,
groups);
}
catch (...) {
}
if (dataset_handle) {
int result = LGBM_DatasetFree(dataset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetFree result code: " << result;
}
}
TEST(Stream, PushDenseRowsWithMetadata) {
// Load some test data
DatasetHandle ref_datset_handle;
const char* params = "max_bin=15";
// Use the smaller ".test" data because we don't care about the actual data and it's smaller
int result = TestUtils::LoadDatasetFromExamples("binary_classification/binary.test", params, &ref_datset_handle);
EXPECT_EQ(0, result) << "LoadDatasetFromExamples result code: " << result;
Dataset* ref_dataset = static_cast<Dataset*>(ref_datset_handle);
auto noriginalrows = ref_dataset->num_data();
Log::Info("Row count: %d", noriginalrows);
Log::Info("Feature group count: %d", ref_dataset->num_features());
// Add some fake initial_scores and groups so we can test streaming them
int nclasses = 2; // choose > 1 just to test multi-class handling
std::vector<double> unused_init_scores;
unused_init_scores.resize(noriginalrows * nclasses);
std::vector<int32_t> unused_groups;
unused_groups.assign(noriginalrows, 1);
result = LGBM_DatasetSetField(ref_datset_handle, "init_score", unused_init_scores.data(), noriginalrows * nclasses, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetSetField init_score result code: " << result;
result = LGBM_DatasetSetField(ref_datset_handle, "group", unused_groups.data(), noriginalrows, 2);
EXPECT_EQ(0, result) << "LGBM_DatasetSetField group result code: " << result;
// Now use the reference dataset schema to make some testable Datasets with N rows each
int32_t nrows = 1000;
int32_t ncols = ref_dataset->num_features();
std::vector<double> features;
std::vector<float> labels;
std::vector<float> weights;
std::vector<double> init_scores;
std::vector<int32_t> groups;
Log::Info("Creating random data");
TestUtils::CreateRandomDenseData(nrows, ncols, nclasses, &features, &labels, &weights, &init_scores, &groups);
const std::vector<int32_t> batch_counts = { 1, nrows / 100, nrows / 10, nrows };
const std::vector<int8_t> creation_types = { 0, 1 };
for (size_t i = 0; i < creation_types.size(); ++i) { // from sampled data or reference
for (size_t j = 0; j < batch_counts.size(); ++j) {
auto type = creation_types[i];
auto batch_count = batch_counts[j];
test_stream_dense(type, ref_datset_handle, nrows, ncols, nclasses, batch_count, &features, &labels, &weights, &init_scores, &groups);
}
}
result = LGBM_DatasetFree(ref_datset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetFree result code: " << result;
}
TEST(Stream, PushSparseRowsWithMetadata) {
// Load some test data
DatasetHandle ref_datset_handle;
const char* params = "max_bin=15";
// Use the smaller ".test" data because we don't care about the actual data and it's smaller
int result = TestUtils::LoadDatasetFromExamples("binary_classification/binary.test", params, &ref_datset_handle);
EXPECT_EQ(0, result) << "LoadDatasetFromExamples result code: " << result;
Dataset* ref_dataset = static_cast<Dataset*>(ref_datset_handle);
auto noriginalrows = ref_dataset->num_data();
Log::Info("Row count: %d", noriginalrows);
Log::Info("Feature group count: %d", ref_dataset->num_features());
// Add some fake initial_scores and groups so we can test streaming them
int32_t nclasses = 2;
std::vector<double> unused_init_scores;
unused_init_scores.resize(noriginalrows * nclasses);
std::vector<int32_t> unused_groups;
unused_groups.assign(noriginalrows, 1);
result = LGBM_DatasetSetField(ref_datset_handle, "init_score", unused_init_scores.data(), noriginalrows * nclasses, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetSetField init_score result code: " << result;
result = LGBM_DatasetSetField(ref_datset_handle, "group", unused_groups.data(), noriginalrows, 2);
EXPECT_EQ(0, result) << "LGBM_DatasetSetField group result code: " << result;
// Now use the reference dataset schema to make some testable Datasets with N rows each
int32_t nrows = 1000;
int32_t ncols = ref_dataset->num_features();
std::vector<int32_t> indptr;
std::vector<int32_t> indices;
std::vector<double> vals;
std::vector<float> labels;
std::vector<float> weights;
std::vector<double> init_scores;
std::vector<int32_t> groups;
Log::Info("Creating random data");
float sparse_percent = .1f;
TestUtils::CreateRandomSparseData(nrows, ncols, nclasses, sparse_percent, &indptr, &indices, &vals, &labels, &weights, &init_scores, &groups);
const std::vector<int32_t> batch_counts = { 1, nrows / 100, nrows / 10, nrows };
const std::vector<int8_t> creation_types = { 0, 1 };
for (size_t i = 0; i < creation_types.size(); ++i) { // from sampled data or reference
for (size_t j = 0; j < batch_counts.size(); ++j) {
auto type = creation_types[i];
auto batch_count = batch_counts[j];
test_stream_sparse(type, ref_datset_handle, nrows, ncols, nclasses, batch_count, &indptr, &indices, &vals, &labels, &weights, &init_scores, &groups);
}
}
result = LGBM_DatasetFree(ref_datset_handle);
EXPECT_EQ(0, result) << "LGBM_DatasetFree result code: " << result;
}

Просмотреть файл

@ -0,0 +1,379 @@
/*!
* Copyright (c) 2022 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
*/
#include <testutils.h>
#include <LightGBM/c_api.h>
#include <LightGBM/utils/random.h>
#include <gtest/gtest.h>
#include <string>
using LightGBM::Log;
using LightGBM::Random;
namespace LightGBM {
/*!
* Creates a Dataset from the internal repository examples.
*/
int TestUtils::LoadDatasetFromExamples(const char* filename, const char* config, DatasetHandle* out) {
std::string fullPath("../examples/");
fullPath += filename;
Log::Info("Debug sample data path: %s", fullPath.c_str());
return LGBM_DatasetCreateFromFile(
fullPath.c_str(),
config,
nullptr,
out);
}
/*!
* Creates fake data in the passed vectors.
*/
void TestUtils::CreateRandomDenseData(
int32_t nrows,
int32_t ncols,
int32_t nclasses,
std::vector<double>* features,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups) {
Random rand(42);
features->reserve(nrows * ncols);
for (int32_t row = 0; row < nrows; row++) {
for (int32_t col = 0; col < ncols; col++) {
features->push_back(rand.NextFloat());
}
}
CreateRandomMetadata(nrows, nclasses, labels, weights, init_scores, groups);
}
/*!
* Creates fake data in the passed vectors.
*/
void TestUtils::CreateRandomSparseData(
int32_t nrows,
int32_t ncols,
int32_t nclasses,
float sparse_percent,
std::vector<int32_t>* indptr,
std::vector<int32_t>* indices,
std::vector<double>* values,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups) {
Random rand(42);
indptr->reserve(static_cast<int32_t>(nrows + 1));
indices->reserve(static_cast<int32_t>(sparse_percent * nrows * ncols));
values->reserve(static_cast<int32_t>(sparse_percent * nrows * ncols));
indptr->push_back(0);
for (int32_t row = 0; row < nrows; row++) {
for (int32_t col = 0; col < ncols; col++) {
float rnd = rand.NextFloat();
if (rnd < sparse_percent) {
indices->push_back(col);
values->push_back(rand.NextFloat());
}
}
indptr->push_back(static_cast<int32_t>(indices->size() - 1));
}
CreateRandomMetadata(nrows, nclasses, labels, weights, init_scores, groups);
}
/*!
* Creates fake data in the passed vectors.
*/
void TestUtils::CreateRandomMetadata(int32_t nrows,
int32_t nclasses,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups) {
Random rand(42);
labels->reserve(nrows);
if (weights) {
weights->reserve(nrows);
}
if (init_scores) {
init_scores->reserve(nrows * nclasses);
}
if (groups) {
groups->reserve(nrows);
}
int32_t group = 0;
for (int32_t row = 0; row < nrows; row++) {
labels->push_back(rand.NextFloat());
if (weights) {
weights->push_back(rand.NextFloat());
}
if (init_scores) {
for (int32_t i = 0; i < nclasses; i++) {
init_scores->push_back(rand.NextFloat());
}
}
if (groups) {
if (rand.NextFloat() > 0.95) {
group++;
}
groups->push_back(group);
}
}
}
void TestUtils::StreamDenseDataset(DatasetHandle dataset_handle,
int32_t nrows,
int32_t ncols,
int32_t nclasses,
int32_t batch_count,
const std::vector<double>* features,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
int result = LGBM_DatasetSetWaitForManualFinish(dataset_handle, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetSetWaitForManualFinish result code: " << result;
Log::Info(" Begin StreamDenseDataset");
if ((nrows % batch_count) != 0) {
Log::Fatal("This utility method only handles nrows that are a multiple of batch_count");
}
const double* features_ptr = features->data();
const float* labels_ptr = labels->data();
const float* weights_ptr = nullptr;
if (weights) {
weights_ptr = weights->data();
}
// Since init_scores are in a column format, but need to be pushed as rows, we have to extract each batch
std::vector<double> init_score_batch;
const double* init_scores_ptr = nullptr;
if (init_scores) {
init_score_batch.reserve(nclasses * batch_count);
init_scores_ptr = init_score_batch.data();
}
const int32_t* groups_ptr = nullptr;
if (groups) {
groups_ptr = groups->data();
}
auto start_time = std::chrono::steady_clock::now();
for (int32_t i = 0; i < nrows; i += batch_count) {
if (init_scores) {
init_scores_ptr = CreateInitScoreBatch(&init_score_batch, i, nrows, nclasses, batch_count, init_scores);
}
result = LGBM_DatasetPushRowsWithMetadata(dataset_handle,
features_ptr,
1,
batch_count,
ncols,
i,
labels_ptr,
weights_ptr,
init_scores_ptr,
groups_ptr,
0);
EXPECT_EQ(0, result) << "LGBM_DatasetPushRowsWithMetadata result code: " << result;
if (result != 0) {
FAIL() << "LGBM_DatasetPushRowsWithMetadata failed"; // This forces an immediate failure, which EXPECT_EQ does not
}
features_ptr += batch_count * ncols;
labels_ptr += batch_count;
if (weights_ptr) {
weights_ptr += batch_count;
}
if (groups_ptr) {
groups_ptr += batch_count;
}
}
auto cur_time = std::chrono::steady_clock::now();
Log::Info(" Time: %d", cur_time - start_time);
}
void TestUtils::StreamSparseDataset(DatasetHandle dataset_handle,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<int32_t>* indptr,
const std::vector<int32_t>* indices,
const std::vector<double>* values,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups) {
int result = LGBM_DatasetSetWaitForManualFinish(dataset_handle, 1);
EXPECT_EQ(0, result) << "LGBM_DatasetSetWaitForManualFinish result code: " << result;
Log::Info(" Begin StreamSparseDataset");
if ((nrows % batch_count) != 0) {
Log::Fatal("This utility method only handles nrows that are a multiple of batch_count");
}
const int32_t* indptr_ptr = indptr->data();
const int32_t* indices_ptr = indices->data();
const double* values_ptr = values->data();
const float* labels_ptr = labels->data();
const float* weights_ptr = nullptr;
if (weights) {
weights_ptr = weights->data();
}
// Since init_scores are in a column format, but need to be pushed as rows, we have to extract each batch
std::vector<double> init_score_batch;
const double* init_scores_ptr = nullptr;
if (init_scores) {
init_score_batch.reserve(nclasses * batch_count);
init_scores_ptr = init_score_batch.data();
}
const int32_t* groups_ptr = nullptr;
if (groups) {
groups_ptr = groups->data();
}
auto start_time = std::chrono::steady_clock::now();
for (int32_t i = 0; i < nrows; i += batch_count) {
if (init_scores) {
init_scores_ptr = CreateInitScoreBatch(&init_score_batch, i, nrows, nclasses, batch_count, init_scores);
}
int32_t nelem = indptr->at(i + batch_count - 1) - indptr->at(i);
result = LGBM_DatasetPushRowsByCSRWithMetadata(dataset_handle,
indptr_ptr,
2,
indices_ptr,
values_ptr,
1,
batch_count + 1,
nelem,
i,
labels_ptr,
weights_ptr,
init_scores_ptr,
groups_ptr,
0);
EXPECT_EQ(0, result) << "LGBM_DatasetPushRowsByCSRWithMetadata result code: " << result;
if (result != 0) {
FAIL() << "LGBM_DatasetPushRowsByCSRWithMetadata failed"; // This forces an immediate failure, which EXPECT_EQ does not
}
indptr_ptr += batch_count;
labels_ptr += batch_count;
if (weights_ptr) {
weights_ptr += batch_count;
}
if (groups_ptr) {
groups_ptr += batch_count;
}
}
auto cur_time = std::chrono::steady_clock::now();
Log::Info(" Time: %d", cur_time - start_time);
}
void TestUtils::AssertMetadata(const Metadata* metadata,
const std::vector<float>* ref_labels,
const std::vector<float>* ref_weights,
const std::vector<double>* ref_init_scores,
const std::vector<int32_t>* ref_groups) {
const float* labels = metadata->label();
auto nTotal = static_cast<int32_t>(ref_labels->size());
for (auto i = 0; i < nTotal; i++) {
EXPECT_EQ(ref_labels->at(i), labels[i]) << "Inserted data: " << ref_labels->at(i);
if (ref_labels->at(i) != labels[i]) {
FAIL() << "Mismatched labels"; // This forces an immediate failure, which EXPECT_EQ does not
}
}
const float* weights = metadata->weights();
if (weights) {
if (!ref_weights) {
FAIL() << "Expected null weights";
}
for (auto i = 0; i < nTotal; i++) {
EXPECT_EQ(ref_weights->at(i), weights[i]) << "Inserted data: " << ref_weights->at(i);
if (ref_weights->at(i) != weights[i]) {
FAIL() << "Mismatched weights"; // This forces an immediate failure, which EXPECT_EQ does not
}
}
} else if (ref_weights) {
FAIL() << "Expected non-null weights";
}
const double* init_scores = metadata->init_score();
if (init_scores) {
if (!ref_init_scores) {
FAIL() << "Expected null init_scores";
}
for (size_t i = 0; i < ref_init_scores->size(); i++) {
EXPECT_EQ(ref_init_scores->at(i), init_scores[i]) << "Inserted data: " << ref_init_scores->at(i) << " Index: " << i;
if (ref_init_scores->at(i) != init_scores[i]) {
FAIL() << "Mismatched init_scores"; // This forces an immediate failure, which EXPECT_EQ does not
}
}
} else if (ref_init_scores) {
FAIL() << "Expected non-null init_scores";
}
const int32_t* query_boundaries = metadata->query_boundaries();
if (query_boundaries) {
if (!ref_groups) {
FAIL() << "Expected null query_boundaries";
}
// Calculate expected boundaries
std::vector<int32_t> ref_query_boundaries;
ref_query_boundaries.push_back(0);
int group_val = ref_groups->at(0);
for (auto i = 1; i < nTotal; i++) {
if (ref_groups->at(i) != group_val) {
ref_query_boundaries.push_back(i);
group_val = ref_groups->at(i);
}
}
ref_query_boundaries.push_back(nTotal);
for (size_t i = 0; i < ref_query_boundaries.size(); i++) {
EXPECT_EQ(ref_query_boundaries[i], query_boundaries[i]) << "Inserted data: " << ref_query_boundaries[i];
if (ref_query_boundaries[i] != query_boundaries[i]) {
FAIL() << "Mismatched query_boundaries"; // This forces an immediate failure, which EXPECT_EQ does not
}
}
} else if (ref_groups) {
FAIL() << "Expected non-null query_boundaries";
}
}
const double* TestUtils::CreateInitScoreBatch(std::vector<double>* init_score_batch,
int32_t index,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<double>* original_init_scores) {
// Extract a set of rows from the column-based format (still maintaining column based format)
init_score_batch->clear();
for (int32_t c = 0; c < nclasses; c++) {
for (int32_t row = index; row < index + batch_count; row++) {
init_score_batch->push_back(original_init_scores->at(row + nrows * c));
}
}
return init_score_batch->data();
}
} // namespace LightGBM

108
tests/cpp_tests/testutils.h Normal file
Просмотреть файл

@ -0,0 +1,108 @@
/*!
* Copyright (c) 2022 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
*/
#ifndef LIGHTGBM_TESTUTILS_H_
#define LIGHTGBM_TESTUTILS_H_
#include <LightGBM/c_api.h>
#include <LightGBM/dataset.h>
#include <vector>
using LightGBM::Metadata;
namespace LightGBM {
class TestUtils {
public:
/*!
* Creates a Dataset from the internal repository examples.
*/
static int LoadDatasetFromExamples(const char* filename, const char* config, DatasetHandle* out);
/*!
* Creates a dense Dataset of random values.
*/
static void CreateRandomDenseData(int32_t nrows,
int32_t ncols,
int32_t nclasses,
std::vector<double>* features,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups);
/*!
* Creates a CSR sparse Dataset of random values.
*/
static void CreateRandomSparseData(int32_t nrows,
int32_t ncols,
int32_t nclasses,
float sparse_percent,
std::vector<int32_t>* indptr,
std::vector<int32_t>* indices,
std::vector<double>* values,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups);
/*!
* Creates a batch of Metadata of random values.
*/
static void CreateRandomMetadata(int32_t nrows,
int32_t nclasses,
std::vector<float>* labels,
std::vector<float>* weights,
std::vector<double>* init_scores,
std::vector<int32_t>* groups);
/*!
* Pushes nrows of data to a Dataset in batches of batch_count.
*/
static void StreamDenseDataset(DatasetHandle dataset_handle,
int32_t nrows,
int32_t ncols,
int32_t nclasses,
int32_t batch_count,
const std::vector<double>* features,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups);
/*!
* Pushes nrows of data to a Dataset in batches of batch_count.
*/
static void StreamSparseDataset(DatasetHandle dataset_handle,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<int32_t>* indptr,
const std::vector<int32_t>* indices,
const std::vector<double>* values,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups);
/*!
* Validates metadata against reference vectors.
*/
static void AssertMetadata(const Metadata* metadata,
const std::vector<float>* labels,
const std::vector<float>* weights,
const std::vector<double>* init_scores,
const std::vector<int32_t>* groups);
static const double* CreateInitScoreBatch(std::vector<double>* init_score_batch,
int32_t index,
int32_t nrows,
int32_t nclasses,
int32_t batch_count,
const std::vector<double>* original_init_scores);
};
} // namespace LightGBM
#endif // LIGHTGBM_TESTUTILS_H_