* add openmp support for parallelization
add bin counting for debugging

* add algo precision test

* remove parallelization messing with model weights

* fix windows/mac build
This commit is contained in:
Markus Cozowicz 2021-12-10 20:59:22 +01:00 коммит произвёл GitHub
Родитель cdfba6331f
Коммит 038b422cc5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 218 добавлений и 111 удалений

Просмотреть файл

@ -24,7 +24,8 @@ class CBM(BaseEstimator):
single_update_per_iteration:bool = True,
date_features: Union[str, List[str]] = 'day,month',
binning: Union[int, lambda x: int] = 10,
metric: str = 'rmse'
metric: str = 'rmse',
enable_bin_count: bool = False
) -> None:
"""Initialize the CBM model.
@ -45,6 +46,7 @@ class CBM(BaseEstimator):
self.min_iterations_early_stopping = min_iterations_early_stopping
self.epsilon_early_stopping = epsilon_early_stopping
self.single_update_per_iteration = single_update_per_iteration
self.enable_bin_count = enable_bin_count
# lets make sure it's serializable
if isinstance(date_features, list):
@ -95,7 +97,7 @@ class CBM(BaseEstimator):
# deal with continuous features
bin_num = self.binning if isinstance(self.binning, int) else self.binning(X[col])
X_binned, bins = pd.qcut(X[col].fillna(0), bin_num, retbins=True)
X_binned, bins = pd.qcut(X[col].fillna(0), bin_num, duplicates='drop', retbins=True)
self._feature_names.append(col)
self._feature_categories.append(X_binned.cat.categories.astype(str).tolist())
@ -159,7 +161,8 @@ class CBM(BaseEstimator):
self.min_iterations_early_stopping,
self.epsilon_early_stopping,
self.single_update_per_iteration,
self.metric
self.metric,
self.enable_bin_count
)
self.is_fitted_ = True
@ -359,3 +362,7 @@ class CBM(BaseEstimator):
@property
def iterations(self):
return self._cpp.iterations
@property
def bin_count(self):
return self._cpp.bin_count

Просмотреть файл

@ -21,8 +21,13 @@ def get_extra_compile_args():
if cflags is None:
cflags = ""
return cflags.split() \
+ ["-std=c++11", "-Wall", "-Wextra", "-march=native", "-msse2", "-ffast-math", "-mfpmath=sse"] #, "-fopenmp"]
cflags = cflags.split() \
+ ["-std=c++11", "-Wall", "-Wextra", "-march=native", "-msse2", "-ffast-math", "-mfpmath=sse"]
if platform.system() == "Linux":
cflags += ["-fopenmp", "-lgomp"]
return cflags
def get_libraries():
if platform.system() == "Windows":
@ -61,7 +66,6 @@ setup(
'interactive': ['matplotlib>=2.2.0'],
},
packages=["cbm"],
# package_data={ "cbm": ["src/pycbm.h", "src/cbm.h"] },
ext_modules=[
Extension(
"cbm_cpp",
@ -70,6 +74,7 @@ setup(
extra_compile_args=get_extra_compile_args(),
libraries=get_libraries(),
language="c++11",
extra_link_args=['-fopenmp'] if platform.system() == "Linux" else []
)
],
headers=["src/pycbm.h", "src/cbm.h"],

Просмотреть файл

@ -39,6 +39,10 @@ namespace cbm
return _iterations;
}
const std::vector<std::vector<uint32_t>> & CBM::get_bin_count() const {
return _bin_count;
}
void CBM::fit(
const uint32_t *y,
const char *x_data,
@ -54,18 +58,28 @@ namespace cbm
double epsilon_early_stopping,
bool single_update_per_iteration,
uint8_t x_bytes_per_feature,
float (*metric)(const uint32_t*, const double*, size_t n_examples))
float (*metric)(const uint32_t*, const double*, size_t n_examples),
bool enable_bin_count)
{
switch (x_bytes_per_feature)
{
case 1:
fit_internal<uint8_t>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
if (enable_bin_count)
fit_internal<uint8_t, true>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
else
fit_internal<uint8_t, false>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
break;
case 2:
fit_internal<uint16_t>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
if (enable_bin_count)
fit_internal<uint16_t, true>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
else
fit_internal<uint16_t, false>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
break;
case 4:
fit_internal<uint32_t>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
if (enable_bin_count)
fit_internal<uint32_t, true>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
else
fit_internal<uint32_t, false>(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric);
break;
}
}
@ -73,6 +87,7 @@ namespace cbm
float metric_RMSE(const uint32_t* y, const double* y_hat, size_t n_examples)
{
double rmse = 0;
#pragma omp parallel for schedule(static, 10000) reduction(+: rmse)
for (size_t i = 0; i < n_examples; i++)
rmse += (y_hat[i] - y[i]) * (y_hat[i] - y[i]);
@ -82,6 +97,7 @@ namespace cbm
float metric_SMAPE(const uint32_t* y, const double* y_hat, size_t n_examples)
{
double smape = 0;
#pragma omp parallel for schedule(static, 10000) reduction(+: smape)
for (size_t i = 0; i < n_examples; i++) {
if (y[i] == 0 && y_hat[i] == 0)
continue;
@ -95,9 +111,10 @@ namespace cbm
float metric_L1(const uint32_t* y, const double* y_hat, size_t n_examples)
{
double l1 = 0;
#pragma omp parallel for schedule(static, 10000) reduction(+: l1)
for (size_t i = 0; i < n_examples; i++)
l1 += std::abs(y_hat[i] - y[i]);
return l1;
return l1 / n_examples;
}
}

216
src/cbm.h
Просмотреть файл

@ -9,6 +9,10 @@
#include <cmath>
#include <algorithm>
#include <functional>
#include <iostream>
// #include <omp.h>
// #include <chrono>
// using namespace std::chrono;
namespace cbm
{
@ -23,33 +27,48 @@ namespace cbm
double _y_mean;
size_t _iterations;
std::vector<std::vector<uint32_t>> _bin_count;
template<typename T>
void update_y_hat(
std::vector<double>& y_hat,
std::vector<std::vector<T>> &x,
size_t n_examples,
size_t n_features)
{
// predict
y_hat.assign(n_examples, _y_mean);
#pragma omp parallel for schedule(static, 10000)
for (size_t i = 0; i < n_examples; i++)
for (size_t j = 0; j < n_features; j++)
y_hat[i] *= _f[j][x[j][i]];
}
template<typename T>
void update_y_hat_sum(
std::vector<double>& y_hat,
std::vector<std::vector<uint64_t>> &y_hat_sum,
std::vector<std::vector<T>> &x,
size_t n_examples,
size_t n_features)
{
update_y_hat(y_hat, x, n_examples, n_features);
// reset y_hat_sum
#pragma omp parallel for
for (size_t j = 0; j < n_features; j++)
std::fill(y_hat_sum[j].begin(), y_hat_sum[j].end(), 0);
// compute y_hat and y_hat_sum
for (size_t i = 0; i < n_examples; i++)
{
// TODO: parallelize & vectorize
// TODO: use log to stabilize?
auto y_hat_i = _y_mean;
for (size_t j = 0; j < n_features; j++)
y_hat_i *= _f[j][x[j][i]];
for (size_t j = 0; j < n_features; j++)
y_hat_sum[j][x[j][i]] += y_hat_i;
}
#pragma omp parallel for
for (size_t j = 0; j < n_features; j++)
for (size_t i = 0; i < n_examples; i++)
// TODO: use log to stabilize?
y_hat_sum[j][x[j][i]] += y_hat[i];
}
template<typename T>
template<typename T, bool enableBinCount>
void fit_internal(
const uint32_t *y,
const char *x_data,
@ -66,104 +85,104 @@ namespace cbm
bool single_update_per_iteration,
float (*metric)(const uint32_t*, const double*, size_t n_examples))
{
_y_mean = y_mean;
_y_mean = y_mean;
// allocation
std::vector<std::vector<T>> x(n_features); // n_features x n_examples
std::vector<std::vector<T>> g(n_features); // n_features x max_bin[j] (jagged)
std::vector<std::vector<uint64_t>> y_sum(n_features); // n_features x max_bin[j] (jagged)
std::vector<std::vector<uint64_t>> y_hat_sum(n_features); // n_features x max_bin[j] (jagged)
std::vector<double> y_hat(n_examples);
// allocation
std::vector<std::vector<T>> x(n_features); // n_features x n_examples
std::vector<std::vector<T>> g(n_features); // n_features x max_bin[j] (jagged)
std::vector<std::vector<uint64_t>> y_sum(n_features); // n_features x max_bin[j] (jagged)
std::vector<std::vector<uint64_t>> y_hat_sum(n_features); // n_features x max_bin[j] (jagged)
// std::vector<std::vector<uint16_t>> bin_count(n_features);
std::vector<double> y_hat(n_examples);
_f.resize(n_features);
if (enableBinCount)
_bin_count.resize(n_features);
_f.resize(n_features);
for (size_t j = 0; j < n_features; j++)
{
uint32_t max_bin = x_max[j];
g[j].resize(max_bin + 1);
_f[j].resize(max_bin + 1, 1);
y_sum[j].resize(max_bin + 1);
y_hat_sum[j].resize(max_bin + 1);
// bin_count[j].resize(max_bin + 1);
// alloc and store columnar
x[j].reserve(n_examples);
for (size_t i = 0; i < n_examples; i++)
{
// https://docs.python.org/3/c-api/buffer.html#complex-arrays
// strides are expressed in char not target type
T x_ij = *reinterpret_cast<const T *>(x_data + i * x_stride0 + j * x_stride1);
x[j].push_back(x_ij);
y_sum[j][x_ij] += y[i];
// bin_count[j][x_ij]++;
}
}
// iterations
double learning_rate = learning_rate_step_size;
double rmse0 = std::numeric_limits<double>::infinity();
for (_iterations = 0; _iterations < max_iterations; _iterations++, learning_rate += learning_rate_step_size)
{
// cap at 1
if (learning_rate > 1)
learning_rate = 1;
update_y_hat_sum(y_hat_sum, x, n_examples, n_features);
// compute g
// #pragma omp for // didn't observe improvement
#pragma omp parallel for
for (size_t j = 0; j < n_features; j++)
{
for (size_t k = 0; k <= x_max[j]; k++)
uint32_t max_bin = x_max[j];
g[j].resize(max_bin + 1);
_f[j].resize(max_bin + 1, 1);
y_sum[j].resize(max_bin + 1);
y_hat_sum[j].resize(max_bin + 1);
if (enableBinCount)
_bin_count[j].resize(max_bin + 1, 0);
// alloc and store columnar
x[j].reserve(n_examples);
for (size_t i = 0; i < n_examples; i++)
{
// https://docs.python.org/3/c-api/buffer.html#complex-arrays
// strides are expressed in char not target type
T x_ij = *reinterpret_cast<const T *>(x_data + i * x_stride0 + j * x_stride1);
x[j].push_back(x_ij);
// TODO: check if a bin is empty. might be better to remap/exclude the bins?
if (y_sum[j][k])
{
// improve stability
double g = (double)y_sum[j][k] / y_hat_sum[j][k]; // eqn. 2 (a)
y_sum[j][x_ij] += y[i];
// magic numbers found in Regularization section (worsen it quite a bit)
// double g = (2.0 * y_sum[j][k]) / (1.67834 * y_hat_sum[j][k]); // eqn. 2 (a)
if (learning_rate == 1)
_f[j][k] *= g;
else
_f[j][k] *= std::exp(learning_rate * std::log(g)); // eqn 2 (b) + eqn 4
if (!single_update_per_iteration)
update_y_hat_sum(y_hat_sum, x, n_examples, n_features);
}
if (enableBinCount)
_bin_count[j][x_ij]++;
}
}
// predict
y_hat.assign(n_examples, _y_mean);
for (size_t i = 0; i < n_examples; i++)
for (size_t j = 0; j < n_features; j++)
y_hat[i] *= _f[j][x[j][i]];
// iterations
double learning_rate = learning_rate_step_size;
double rmse0 = std::numeric_limits<double>::infinity();
double rmse = metric(y, y_hat.data(), n_examples);
// check for early stopping
// TODO: expose minimum number of rounds
if (_iterations > min_iterations_early_stopping &&
(rmse > rmse0 || (rmse0 - rmse) < epsilon_early_stopping))
for (_iterations = 0; _iterations < max_iterations; _iterations++, learning_rate += learning_rate_step_size)
{
// TODO: record diagnostics?
// printf("early stopping %1.4f vs %1.4f after t=%d\n", rmse, rmse0, (int)t);
break;
// cap at 1
if (learning_rate > 1)
learning_rate = 1;
update_y_hat_sum(y_hat, y_hat_sum, x, n_examples, n_features);
// compute g
for (size_t j = 0; j < n_features; j++)
{
for (size_t k = 0; k <= x_max[j]; k++)
{
// TODO: check if a bin is empty. might be better to remap/exclude the bins?
if (y_sum[j][k])
{
// improve stability
double g = (double)y_sum[j][k] / y_hat_sum[j][k]; // eqn. 2 (a)
// magic numbers found in Regularization section (worsen it quite a bit)
// double g = (2.0 * y_sum[j][k]) / (1.67834 * y_hat_sum[j][k]); // eqn. 2 (a)
if (learning_rate == 1)
_f[j][k] *= g;
else
_f[j][k] *= std::exp(learning_rate * std::log(g)); // eqn 2 (b) + eqn 4
if (!single_update_per_iteration) {
update_y_hat_sum(y_hat, y_hat_sum, x, n_examples, n_features);
}
}
}
}
// prediction
update_y_hat(y_hat, x, n_examples, n_features);
double rmse = metric(y, y_hat.data(), n_examples);
// check for early stopping
// TODO: expose minimum number of rounds
if (_iterations > min_iterations_early_stopping &&
(rmse > rmse0 || (rmse0 - rmse) < epsilon_early_stopping))
{
// TODO: record diagnostics?
// printf("early stopping %1.4f vs %1.4f after t=%d\n", rmse, rmse0, (int)t);
break;
}
rmse0 = rmse;
}
rmse0 = rmse;
}
}
public:
CBM();
@ -184,7 +203,8 @@ namespace cbm
double epsilon_early_stopping,
bool single_update_per_iteration,
uint8_t x_bytes_per_feature,
float (*metric)(const uint32_t*, const double*, size_t n_examples));
float (*metric)(const uint32_t*, const double*, size_t n_examples),
bool enable_bin_count);
template <bool explain, typename T>
void predict(
@ -203,7 +223,7 @@ namespace cbm
double *out_y_hat = out_data;
std::fill(out_y_hat, out_y_hat + n_examples, _y_mean);
// TODO: batch parallelization
#pragma omp parallel for schedule(static, 10000)
for (size_t i = 0; i < n_examples; i++)
{
double &y_hat_i = *(out_y_hat + i);
@ -229,5 +249,7 @@ namespace cbm
void set_y_mean(float mean);
size_t get_iterations() const;
const std::vector<std::vector<uint32_t>> &get_bin_count() const;
};
}

Просмотреть файл

@ -27,7 +27,8 @@ namespace cbm
size_t min_iterations_early_stopping,
double epsilon_early_stopping,
bool single_update_per_iteration,
std::string metric)
std::string metric,
bool enable_bin_count)
{
// can't check compare just the format as linux returns I, windows returns L when using astype('uint32')
@ -114,7 +115,8 @@ namespace cbm
epsilon_early_stopping,
single_update_per_iteration,
(uint8_t)x_info.itemsize,
metric_func);
metric_func,
enable_bin_count);
}
py::array_t<double> PyCBM::predict(py::buffer x_b, bool explain)
@ -197,6 +199,10 @@ namespace cbm
{
return _cbm.get_iterations();
}
const std::vector<std::vector<uint32_t>> &PyCBM::get_bin_count() const {
return _cbm.get_bin_count();
}
};
PYBIND11_MODULE(cbm_cpp, m)
@ -210,6 +216,7 @@ PYBIND11_MODULE(cbm_cpp, m)
.def_property("y_mean", &cbm::PyCBM::get_y_mean, &cbm::PyCBM::set_y_mean)
.def_property("weights", &cbm::PyCBM::get_weights, &cbm::PyCBM::set_weights)
.def_property_readonly("iterations", &cbm::PyCBM::get_iterations)
.def_property_readonly("bin_count", &cbm::PyCBM::get_bin_count)
.def(py::pickle(
[](const cbm::PyCBM &p) { // __getstate__
/* TODO: this does not include the feature pre-processing */

Просмотреть файл

@ -33,7 +33,8 @@ namespace cbm
size_t min_iterations_early_stopping,
double epsilon_early_stopping,
bool single_update_per_iteration,
std::string metric);
std::string metric,
bool enable_bin_count);
py::array_t<double> predict(py::buffer x_b, bool explain);
@ -46,5 +47,7 @@ namespace cbm
void set_y_mean(float mean);
size_t get_iterations() const;
const std::vector<std::vector<uint32_t>> &get_bin_count() const;
};
}

Просмотреть файл

@ -29,6 +29,52 @@ def test_poisson_random():
# for i, idx in enumerate(x):
# y[i] = y_base[i, idx[0], idx[1]]
def test_nyc_bicycle_validate():
np.random.seed(42)
# read data
bic = pd.read_csv('data/nyc_bb_bicyclist_counts.csv')
bic['Date'] = pd.to_datetime(bic['Date'])
bic['Weekday'] = bic['Date'].dt.weekday
y = bic['BB_COUNT'].values.astype('uint32')
# train/test split
split = int(len(y) * 0.8)
train_idx = np.arange(0, split)
test_idx = np.arange(split + 1, len(y))
y_train = y[train_idx]
y_test = y[test_idx]
test_err_expected = {2: 449.848, 3: 533.465, 4: 503.399, 5: 534.738, 6: 527.854, 7: 529.942, 8: 597.041, 9: 615.646, 10: 560.182}
train_err_expected = {2: 632.521, 3: 578.816, 4: 588.342, 5: 563.843, 6: 552.219, 7: 547.073, 8: 518.893, 9: 525.629, 10: 523.194}
for bins in [2, 3, 4, 5, 6, 7, 8, 9, 10]:
x = np.stack([
bic['Weekday'].values,
pd.qcut(bic['HIGH_T'], bins).cat.codes,
pd.qcut(bic['LOW_T'], bins).cat.codes,
pd.qcut(bic['PRECIP'], 5, duplicates='drop').cat.codes
],
axis=1)\
.astype('uint8')
x_train = x[train_idx, ]
x_test = x[test_idx, ]
# fit CBM model
model = cbm.CBM(single_update_per_iteration=False)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
y_pred_train = model.predict(x_train)
test_err = mean_squared_error(y_test, y_pred, squared=False)
train_err = mean_squared_error(y_train, y_pred_train, squared=False)
assert test_err_expected[bins] == pytest.approx(test_err, abs=1e-2)
assert train_err_expected[bins] == pytest.approx(train_err, abs=1e-2)
def test_nyc_bicycle():
np.random.seed(42)