From 038b422cc5011a7a4b346f5379e7db96737a69c3 Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Fri, 10 Dec 2021 20:59:22 +0100 Subject: [PATCH] Openmp (#11) * add openmp support for parallelization add bin counting for debugging * add algo precision test * remove parallelization messing with model weights * fix windows/mac build --- cbm/CBM.py | 13 ++- setup.py | 11 ++- src/cbm.cpp | 27 ++++-- src/cbm.h | 216 +++++++++++++++++++++++++--------------------- src/pycbm.cpp | 11 ++- src/pycbm.h | 5 +- tests/test_cbm.py | 46 ++++++++++ 7 files changed, 218 insertions(+), 111 deletions(-) diff --git a/cbm/CBM.py b/cbm/CBM.py index 5290fd2..77d2678 100644 --- a/cbm/CBM.py +++ b/cbm/CBM.py @@ -24,7 +24,8 @@ class CBM(BaseEstimator): single_update_per_iteration:bool = True, date_features: Union[str, List[str]] = 'day,month', binning: Union[int, lambda x: int] = 10, - metric: str = 'rmse' + metric: str = 'rmse', + enable_bin_count: bool = False ) -> None: """Initialize the CBM model. @@ -45,6 +46,7 @@ class CBM(BaseEstimator): self.min_iterations_early_stopping = min_iterations_early_stopping self.epsilon_early_stopping = epsilon_early_stopping self.single_update_per_iteration = single_update_per_iteration + self.enable_bin_count = enable_bin_count # lets make sure it's serializable if isinstance(date_features, list): @@ -95,7 +97,7 @@ class CBM(BaseEstimator): # deal with continuous features bin_num = self.binning if isinstance(self.binning, int) else self.binning(X[col]) - X_binned, bins = pd.qcut(X[col].fillna(0), bin_num, retbins=True) + X_binned, bins = pd.qcut(X[col].fillna(0), bin_num, duplicates='drop', retbins=True) self._feature_names.append(col) self._feature_categories.append(X_binned.cat.categories.astype(str).tolist()) @@ -159,7 +161,8 @@ class CBM(BaseEstimator): self.min_iterations_early_stopping, self.epsilon_early_stopping, self.single_update_per_iteration, - self.metric + self.metric, + self.enable_bin_count ) self.is_fitted_ = True @@ -359,3 +362,7 @@ class CBM(BaseEstimator): @property def iterations(self): return self._cpp.iterations + + @property + def bin_count(self): + return self._cpp.bin_count \ No newline at end of file diff --git a/setup.py b/setup.py index c656a8a..9c283d7 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,13 @@ def get_extra_compile_args(): if cflags is None: cflags = "" - return cflags.split() \ - + ["-std=c++11", "-Wall", "-Wextra", "-march=native", "-msse2", "-ffast-math", "-mfpmath=sse"] #, "-fopenmp"] + cflags = cflags.split() \ + + ["-std=c++11", "-Wall", "-Wextra", "-march=native", "-msse2", "-ffast-math", "-mfpmath=sse"] + + if platform.system() == "Linux": + cflags += ["-fopenmp", "-lgomp"] + + return cflags def get_libraries(): if platform.system() == "Windows": @@ -61,7 +66,6 @@ setup( 'interactive': ['matplotlib>=2.2.0'], }, packages=["cbm"], - # package_data={ "cbm": ["src/pycbm.h", "src/cbm.h"] }, ext_modules=[ Extension( "cbm_cpp", @@ -70,6 +74,7 @@ setup( extra_compile_args=get_extra_compile_args(), libraries=get_libraries(), language="c++11", + extra_link_args=['-fopenmp'] if platform.system() == "Linux" else [] ) ], headers=["src/pycbm.h", "src/cbm.h"], diff --git a/src/cbm.cpp b/src/cbm.cpp index be94090..befd6b9 100644 --- a/src/cbm.cpp +++ b/src/cbm.cpp @@ -39,6 +39,10 @@ namespace cbm return _iterations; } + const std::vector> & CBM::get_bin_count() const { + return _bin_count; + } + void CBM::fit( const uint32_t *y, const char *x_data, @@ -54,18 +58,28 @@ namespace cbm double epsilon_early_stopping, bool single_update_per_iteration, uint8_t x_bytes_per_feature, - float (*metric)(const uint32_t*, const double*, size_t n_examples)) + float (*metric)(const uint32_t*, const double*, size_t n_examples), + bool enable_bin_count) { switch (x_bytes_per_feature) { case 1: - fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + if (enable_bin_count) + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + else + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); break; case 2: - fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + if (enable_bin_count) + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + else + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); break; case 4: - fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + if (enable_bin_count) + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); + else + fit_internal(y, x_data, x_stride0, x_stride1, n_examples, n_features, y_mean, x_max, learning_rate_step_size, max_iterations, min_iterations_early_stopping, epsilon_early_stopping, single_update_per_iteration, metric); break; } } @@ -73,6 +87,7 @@ namespace cbm float metric_RMSE(const uint32_t* y, const double* y_hat, size_t n_examples) { double rmse = 0; + #pragma omp parallel for schedule(static, 10000) reduction(+: rmse) for (size_t i = 0; i < n_examples; i++) rmse += (y_hat[i] - y[i]) * (y_hat[i] - y[i]); @@ -82,6 +97,7 @@ namespace cbm float metric_SMAPE(const uint32_t* y, const double* y_hat, size_t n_examples) { double smape = 0; + #pragma omp parallel for schedule(static, 10000) reduction(+: smape) for (size_t i = 0; i < n_examples; i++) { if (y[i] == 0 && y_hat[i] == 0) continue; @@ -95,9 +111,10 @@ namespace cbm float metric_L1(const uint32_t* y, const double* y_hat, size_t n_examples) { double l1 = 0; + #pragma omp parallel for schedule(static, 10000) reduction(+: l1) for (size_t i = 0; i < n_examples; i++) l1 += std::abs(y_hat[i] - y[i]); - return l1; + return l1 / n_examples; } } \ No newline at end of file diff --git a/src/cbm.h b/src/cbm.h index a186c64..e1a432e 100644 --- a/src/cbm.h +++ b/src/cbm.h @@ -9,6 +9,10 @@ #include #include #include +#include +// #include +// #include +// using namespace std::chrono; namespace cbm { @@ -23,33 +27,48 @@ namespace cbm double _y_mean; size_t _iterations; + std::vector> _bin_count; + + template + void update_y_hat( + std::vector& y_hat, + std::vector> &x, + size_t n_examples, + size_t n_features) + { + // predict + y_hat.assign(n_examples, _y_mean); + + #pragma omp parallel for schedule(static, 10000) + for (size_t i = 0; i < n_examples; i++) + for (size_t j = 0; j < n_features; j++) + y_hat[i] *= _f[j][x[j][i]]; + } template void update_y_hat_sum( + std::vector& y_hat, std::vector> &y_hat_sum, std::vector> &x, size_t n_examples, size_t n_features) { + update_y_hat(y_hat, x, n_examples, n_features); + // reset y_hat_sum + #pragma omp parallel for for (size_t j = 0; j < n_features; j++) std::fill(y_hat_sum[j].begin(), y_hat_sum[j].end(), 0); // compute y_hat and y_hat_sum - for (size_t i = 0; i < n_examples; i++) - { - // TODO: parallelize & vectorize - // TODO: use log to stabilize? - auto y_hat_i = _y_mean; - for (size_t j = 0; j < n_features; j++) - y_hat_i *= _f[j][x[j][i]]; - - for (size_t j = 0; j < n_features; j++) - y_hat_sum[j][x[j][i]] += y_hat_i; - } + #pragma omp parallel for + for (size_t j = 0; j < n_features; j++) + for (size_t i = 0; i < n_examples; i++) + // TODO: use log to stabilize? + y_hat_sum[j][x[j][i]] += y_hat[i]; } - template + template void fit_internal( const uint32_t *y, const char *x_data, @@ -66,104 +85,104 @@ namespace cbm bool single_update_per_iteration, float (*metric)(const uint32_t*, const double*, size_t n_examples)) { + _y_mean = y_mean; - _y_mean = y_mean; + // allocation + std::vector> x(n_features); // n_features x n_examples + std::vector> g(n_features); // n_features x max_bin[j] (jagged) + std::vector> y_sum(n_features); // n_features x max_bin[j] (jagged) + std::vector> y_hat_sum(n_features); // n_features x max_bin[j] (jagged) + std::vector y_hat(n_examples); - // allocation - std::vector> x(n_features); // n_features x n_examples - std::vector> g(n_features); // n_features x max_bin[j] (jagged) - std::vector> y_sum(n_features); // n_features x max_bin[j] (jagged) - std::vector> y_hat_sum(n_features); // n_features x max_bin[j] (jagged) - // std::vector> bin_count(n_features); - std::vector y_hat(n_examples); + _f.resize(n_features); + if (enableBinCount) + _bin_count.resize(n_features); - _f.resize(n_features); - - for (size_t j = 0; j < n_features; j++) - { - uint32_t max_bin = x_max[j]; - - g[j].resize(max_bin + 1); - _f[j].resize(max_bin + 1, 1); - y_sum[j].resize(max_bin + 1); - y_hat_sum[j].resize(max_bin + 1); - // bin_count[j].resize(max_bin + 1); - - // alloc and store columnar - x[j].reserve(n_examples); - for (size_t i = 0; i < n_examples; i++) - { - // https://docs.python.org/3/c-api/buffer.html#complex-arrays - // strides are expressed in char not target type - T x_ij = *reinterpret_cast(x_data + i * x_stride0 + j * x_stride1); - x[j].push_back(x_ij); - - y_sum[j][x_ij] += y[i]; - // bin_count[j][x_ij]++; - } - } - - // iterations - double learning_rate = learning_rate_step_size; - double rmse0 = std::numeric_limits::infinity(); - - for (_iterations = 0; _iterations < max_iterations; _iterations++, learning_rate += learning_rate_step_size) - { - // cap at 1 - if (learning_rate > 1) - learning_rate = 1; - - update_y_hat_sum(y_hat_sum, x, n_examples, n_features); - - // compute g - // #pragma omp for // didn't observe improvement + #pragma omp parallel for for (size_t j = 0; j < n_features; j++) { - for (size_t k = 0; k <= x_max[j]; k++) + uint32_t max_bin = x_max[j]; + + g[j].resize(max_bin + 1); + _f[j].resize(max_bin + 1, 1); + y_sum[j].resize(max_bin + 1); + y_hat_sum[j].resize(max_bin + 1); + + if (enableBinCount) + _bin_count[j].resize(max_bin + 1, 0); + + // alloc and store columnar + x[j].reserve(n_examples); + for (size_t i = 0; i < n_examples; i++) { + // https://docs.python.org/3/c-api/buffer.html#complex-arrays + // strides are expressed in char not target type + T x_ij = *reinterpret_cast(x_data + i * x_stride0 + j * x_stride1); + x[j].push_back(x_ij); - // TODO: check if a bin is empty. might be better to remap/exclude the bins? - if (y_sum[j][k]) - { - // improve stability - double g = (double)y_sum[j][k] / y_hat_sum[j][k]; // eqn. 2 (a) + y_sum[j][x_ij] += y[i]; - // magic numbers found in Regularization section (worsen it quite a bit) - // double g = (2.0 * y_sum[j][k]) / (1.67834 * y_hat_sum[j][k]); // eqn. 2 (a) - - if (learning_rate == 1) - _f[j][k] *= g; - else - _f[j][k] *= std::exp(learning_rate * std::log(g)); // eqn 2 (b) + eqn 4 - - if (!single_update_per_iteration) - update_y_hat_sum(y_hat_sum, x, n_examples, n_features); - } + if (enableBinCount) + _bin_count[j][x_ij]++; } } - // predict - y_hat.assign(n_examples, _y_mean); - for (size_t i = 0; i < n_examples; i++) - for (size_t j = 0; j < n_features; j++) - y_hat[i] *= _f[j][x[j][i]]; + // iterations + double learning_rate = learning_rate_step_size; + double rmse0 = std::numeric_limits::infinity(); - double rmse = metric(y, y_hat.data(), n_examples); - - // check for early stopping - // TODO: expose minimum number of rounds - if (_iterations > min_iterations_early_stopping && - (rmse > rmse0 || (rmse0 - rmse) < epsilon_early_stopping)) + for (_iterations = 0; _iterations < max_iterations; _iterations++, learning_rate += learning_rate_step_size) { - // TODO: record diagnostics? - // printf("early stopping %1.4f vs %1.4f after t=%d\n", rmse, rmse0, (int)t); - break; + // cap at 1 + if (learning_rate > 1) + learning_rate = 1; + + update_y_hat_sum(y_hat, y_hat_sum, x, n_examples, n_features); + + // compute g + for (size_t j = 0; j < n_features; j++) + { + for (size_t k = 0; k <= x_max[j]; k++) + { + + // TODO: check if a bin is empty. might be better to remap/exclude the bins? + if (y_sum[j][k]) + { + // improve stability + double g = (double)y_sum[j][k] / y_hat_sum[j][k]; // eqn. 2 (a) + + // magic numbers found in Regularization section (worsen it quite a bit) + // double g = (2.0 * y_sum[j][k]) / (1.67834 * y_hat_sum[j][k]); // eqn. 2 (a) + + if (learning_rate == 1) + _f[j][k] *= g; + else + _f[j][k] *= std::exp(learning_rate * std::log(g)); // eqn 2 (b) + eqn 4 + + if (!single_update_per_iteration) { + update_y_hat_sum(y_hat, y_hat_sum, x, n_examples, n_features); + } + } + } + } + + // prediction + update_y_hat(y_hat, x, n_examples, n_features); + + double rmse = metric(y, y_hat.data(), n_examples); + + // check for early stopping + // TODO: expose minimum number of rounds + if (_iterations > min_iterations_early_stopping && + (rmse > rmse0 || (rmse0 - rmse) < epsilon_early_stopping)) + { + // TODO: record diagnostics? + // printf("early stopping %1.4f vs %1.4f after t=%d\n", rmse, rmse0, (int)t); + break; + } + rmse0 = rmse; } - rmse0 = rmse; } - } - - public: CBM(); @@ -184,7 +203,8 @@ namespace cbm double epsilon_early_stopping, bool single_update_per_iteration, uint8_t x_bytes_per_feature, - float (*metric)(const uint32_t*, const double*, size_t n_examples)); + float (*metric)(const uint32_t*, const double*, size_t n_examples), + bool enable_bin_count); template void predict( @@ -203,7 +223,7 @@ namespace cbm double *out_y_hat = out_data; std::fill(out_y_hat, out_y_hat + n_examples, _y_mean); - // TODO: batch parallelization + #pragma omp parallel for schedule(static, 10000) for (size_t i = 0; i < n_examples; i++) { double &y_hat_i = *(out_y_hat + i); @@ -229,5 +249,7 @@ namespace cbm void set_y_mean(float mean); size_t get_iterations() const; + + const std::vector> &get_bin_count() const; }; } \ No newline at end of file diff --git a/src/pycbm.cpp b/src/pycbm.cpp index 1afe662..08e7798 100644 --- a/src/pycbm.cpp +++ b/src/pycbm.cpp @@ -27,7 +27,8 @@ namespace cbm size_t min_iterations_early_stopping, double epsilon_early_stopping, bool single_update_per_iteration, - std::string metric) + std::string metric, + bool enable_bin_count) { // can't check compare just the format as linux returns I, windows returns L when using astype('uint32') @@ -114,7 +115,8 @@ namespace cbm epsilon_early_stopping, single_update_per_iteration, (uint8_t)x_info.itemsize, - metric_func); + metric_func, + enable_bin_count); } py::array_t PyCBM::predict(py::buffer x_b, bool explain) @@ -197,6 +199,10 @@ namespace cbm { return _cbm.get_iterations(); } + + const std::vector> &PyCBM::get_bin_count() const { + return _cbm.get_bin_count(); + } }; PYBIND11_MODULE(cbm_cpp, m) @@ -210,6 +216,7 @@ PYBIND11_MODULE(cbm_cpp, m) .def_property("y_mean", &cbm::PyCBM::get_y_mean, &cbm::PyCBM::set_y_mean) .def_property("weights", &cbm::PyCBM::get_weights, &cbm::PyCBM::set_weights) .def_property_readonly("iterations", &cbm::PyCBM::get_iterations) + .def_property_readonly("bin_count", &cbm::PyCBM::get_bin_count) .def(py::pickle( [](const cbm::PyCBM &p) { // __getstate__ /* TODO: this does not include the feature pre-processing */ diff --git a/src/pycbm.h b/src/pycbm.h index 4dca397..26cfed8 100644 --- a/src/pycbm.h +++ b/src/pycbm.h @@ -33,7 +33,8 @@ namespace cbm size_t min_iterations_early_stopping, double epsilon_early_stopping, bool single_update_per_iteration, - std::string metric); + std::string metric, + bool enable_bin_count); py::array_t predict(py::buffer x_b, bool explain); @@ -46,5 +47,7 @@ namespace cbm void set_y_mean(float mean); size_t get_iterations() const; + + const std::vector> &get_bin_count() const; }; } \ No newline at end of file diff --git a/tests/test_cbm.py b/tests/test_cbm.py index f047dec..3223a6e 100644 --- a/tests/test_cbm.py +++ b/tests/test_cbm.py @@ -29,6 +29,52 @@ def test_poisson_random(): # for i, idx in enumerate(x): # y[i] = y_base[i, idx[0], idx[1]] +def test_nyc_bicycle_validate(): + np.random.seed(42) + + # read data + bic = pd.read_csv('data/nyc_bb_bicyclist_counts.csv') + bic['Date'] = pd.to_datetime(bic['Date']) + bic['Weekday'] = bic['Date'].dt.weekday + + y = bic['BB_COUNT'].values.astype('uint32') + + # train/test split + split = int(len(y) * 0.8) + train_idx = np.arange(0, split) + test_idx = np.arange(split + 1, len(y)) + + y_train = y[train_idx] + y_test = y[test_idx] + + test_err_expected = {2: 449.848, 3: 533.465, 4: 503.399, 5: 534.738, 6: 527.854, 7: 529.942, 8: 597.041, 9: 615.646, 10: 560.182} + train_err_expected = {2: 632.521, 3: 578.816, 4: 588.342, 5: 563.843, 6: 552.219, 7: 547.073, 8: 518.893, 9: 525.629, 10: 523.194} + + for bins in [2, 3, 4, 5, 6, 7, 8, 9, 10]: + x = np.stack([ + bic['Weekday'].values, + pd.qcut(bic['HIGH_T'], bins).cat.codes, + pd.qcut(bic['LOW_T'], bins).cat.codes, + pd.qcut(bic['PRECIP'], 5, duplicates='drop').cat.codes + ], + axis=1)\ + .astype('uint8') + + x_train = x[train_idx, ] + x_test = x[test_idx, ] + + # fit CBM model + model = cbm.CBM(single_update_per_iteration=False) + model.fit(x_train, y_train) + + y_pred = model.predict(x_test) + y_pred_train = model.predict(x_train) + + test_err = mean_squared_error(y_test, y_pred, squared=False) + train_err = mean_squared_error(y_train, y_pred_train, squared=False) + + assert test_err_expected[bins] == pytest.approx(test_err, abs=1e-2) + assert train_err_expected[bins] == pytest.approx(train_err, abs=1e-2) def test_nyc_bicycle(): np.random.seed(42)