Replace HSCI and KCI with causal-learn package functions

The causal-learn package provides more sophisticated implementations of the kernel independence tests. Instead of reimplementing it, we utilize the existing functions from causal-learn instead.

Signed-off-by: Patrick Bloebaum <bloebp@amazon.com>
This commit is contained in:
Patrick Bloebaum 2022-10-26 17:21:06 -07:00 коммит произвёл Patrick Blöbaum
Родитель daf5cfc184
Коммит b9cfc7f223
4 изменённых файлов: 37 добавлений и 278 удалений

Просмотреть файл

@ -10,6 +10,7 @@ def independence_test(X, Y, conditioned_on=None, method="kernel"):
* K. Zhang, J. Peters, D. Janzing, B. Schölkopf. *Kernel-based Conditional Independence Test and Application in Causal Discovery*. UAI'11, Pages 804–813, 2011.
* A. Gretton, K. Fukumizu, C.-H. Teo, L. Song, B. Schölkopf, A. Smola. *A Kernel Statistical Test of Independence*. NIPS 21, 2007.
Here, we utilize the implementations of the https://github.com/cmu-phil/causal-learn package.
* `approx_kernel`: Approximate kernel-based (conditional) independence test.

Просмотреть файл

@ -2,21 +2,16 @@
future.
"""
from typing import Callable, List, Optional, Tuple, Union
from typing import Callable, List, Optional, Union
import numpy as np
import scipy
from causallearn.utils.KCI.KCI import KCI_CInd, KCI_UInd
from joblib import Parallel, delayed
from numpy.linalg import LinAlgError, pinv, svd
from scipy.stats import gamma
from sklearn.preprocessing import scale
import dowhy.gcm.config as config
from dowhy.gcm.constant import EPS
from dowhy.gcm.independence_test.kernel_operation import (
apply_rbf_kernel_with_adaptive_precision,
approximate_rbf_kernel_features,
)
from dowhy.gcm.independence_test.kernel_operation import approximate_rbf_kernel_features
from dowhy.gcm.stats import quantile_based_fwer
from dowhy.gcm.util.general import apply_one_hot_encoding, fit_one_hot_encoders, set_random_seed, shape_into_2d
@ -25,39 +20,37 @@ def kernel_based(
X: np.ndarray,
Y: np.ndarray,
Z: Optional[np.ndarray] = None,
kernel: Callable[[np.ndarray], np.ndarray] = apply_rbf_kernel_with_adaptive_precision,
scale_data: bool = True,
use_bootstrap: bool = True,
bootstrap_num_runs: int = 20,
bootstrap_num_runs: int = 10,
bootstrap_num_samples_per_run: int = 2000,
bootstrap_n_jobs: Optional[int] = None,
p_value_adjust_func: Callable[[Union[np.ndarray, List[float]]], float] = quantile_based_fwer,
**kwargs,
) -> float:
"""Prepares the data and uses kernel (conditional) independence test. The independence test estimates a p-value
for the null hypothesis that X and Y are independent (given Z). Depending whether Z is given, a conditional or
pairwise independence test is performed.
If Z is given: Using KCI as conditional independence test.
If Z is not given: Using HSIC as pairwise independence test.
Here, we utilize the implementations of the https://github.com/cmu-phil/causal-learn package.
If Z is given: Using KCI as conditional independence test, i.e. we use https://github.com/cmu-phil/causal-learn/blob/main/causallearn/utils/KCI/KCI.py#L238.
If Z is not given: Using KCI as pairwise independence test, i.e. we use https://github.com/cmu-phil/causal-learn/blob/main/causallearn/utils/KCI/KCI.py#L17.
Note:
- The data can be multivariate, i.e. the given input matrices can have multiple columns.
- Categorical data need to be represented as strings.
- It is possible to apply a different kernel to each column in the matrices. For instance, a RBF kernel for the
first dimension in X and a delta kernel for the second.
Based on the work:
- Conditional: K. Zhang, J. Peters, D. Janzing, B. Schölkopf. *Kernel-based Conditional Independence Test and Application in Causal Discovery*. UAI'11, Pages 804–813, 2011.
- Pairwise: A. Gretton, K. Fukumizu, C.-H. Teo, L. Song, B. Schölkopf, A. Smola. *A Kernel Statistical Test of Independence*. NIPS 21, 2007.
- K. Zhang, J. Peters, D. Janzing, B. Schölkopf. *Kernel-based Conditional Independence Test and Application in Causal Discovery*. UAI'11, Pages 804–813, 2011.
- A. Gretton, K. Fukumizu, C.-H. Teo, L. Song, B. Schölkopf, A. Smola. *A Kernel Statistical Test of Independence*. NIPS 21, 2007.
For more information about configuring the kernel independence test, see:
- https://github.com/cmu-phil/causal-learn/blob/main/causallearn/utils/KCI/KCI.py#L17 (if Z is not given)
- https://github.com/cmu-phil/causal-learn/blob/main/causallearn/utils/KCI/KCI.py#L238 (if Z is given)
:param X: Data matrix for observations from X.
:param Y: Data matrix for observations from Y.
:param Z: Optional data matrix for observations from Z. This is the conditional variable.
:param kernel: A kernel for estimating the pairwise similarities between samples. The expected input is a n x d
numpy array and the output is expected to be a n x n numpy array. By default, the RBF kernel is used.
:param scale_data: If set to True, the data will be standardized. If set to False, the data is taken as it is.
Standardizing the data helps in identifying weak dependencies. If one is only interested in
stronger ones, consider setting this to False.
:param use_bootstrap: If True, the independence tests are performed on multiple subsets of the data and the final
p-value is constructed based on the provided p_value_adjust_func function.
:param bootstrap_num_runs: Number of bootstrap runs (only relevant if use_bootstrap is True).
@ -83,24 +76,20 @@ def kernel_based(
# If Z is empty, we are in the pairwise setting.
Z = None
if "est_width" not in kwargs:
kwargs["est_width"] = "median"
def evaluate_kernel_test_on_samples(
X: np.ndarray, Y: np.ndarray, Z: np.ndarray, parallel_random_seed: int
) -> float:
set_random_seed(parallel_random_seed)
try:
if Z is None:
return _hsic(X, Y, kernel=kernel, scale_data=scale_data)
else:
return _kci(X, Y, Z, kernel=kernel, scale_data=scale_data)
except LinAlgError:
# TODO: This is a temporary workaround.
# Under some circumstances, the KCI test throws a "numpy.linalg.LinAlgError: SVD did not converge"
# error, depending on the data samples. This is related to the utilized algorithms by numpy for SVD.
# There is actually a robust version for SVD, but it is not included in numpy.
# This can either be addressed by some augmenting the data, using a different SVD implementation or
# wait until numpy updates the used algorithm.
return np.nan
if Z is None:
X, Y = _convert_to_numeric(*shape_into_2d(X, Y))
return KCI_UInd(**kwargs).compute_pvalue(X, Y)[0]
else:
X, Y, Z = _convert_to_numeric(*shape_into_2d(X, Y, Z))
return KCI_CInd(**kwargs).compute_pvalue(X, Y, Z)[0]
if use_bootstrap and X.shape[0] > bootstrap_num_samples_per_run:
random_indices = [
@ -229,205 +218,6 @@ def approx_kernel_based(
)
def _kci(
X: np.ndarray,
Y: np.ndarray,
Z: np.ndarray,
kernel: Callable[[np.ndarray], np.ndarray],
scale_data: bool,
regularization_param: float = 10**-3,
) -> float:
"""
Tests the null hypothesis that X and Y are independent given Z using the kernel conditional independence test.
This is a corrected reimplementation of the KCI method in the CondIndTests R-package. Authors of the original R
package: Christina Heinze-Deml, Jonas Peters, Asbjoern Marco Sinius Munk
:return: The p-value for the null hypothesis that X and Y are independent given Z.
"""
X, Y, Z = _convert_to_numeric(*shape_into_2d(X, Y, Z))
if X.shape[0] != Y.shape[0] != Z.shape[0]:
raise RuntimeError("All variables need to have the same number of samples!")
n = X.shape[0]
if scale_data:
X = scale(X)
Y = scale(Y)
Z = scale(Z)
k_x = kernel(X)
k_y = kernel(Y)
k_z = kernel(Z)
k_xz = k_x * k_z
k_xz = _fast_centering(k_xz)
k_y = _fast_centering(k_y)
k_z = _fast_centering(k_z)
r_z = np.eye(n) - k_z @ pinv(k_z + regularization_param * np.eye(n))
k_xz_z = r_z @ k_xz @ r_z.T
k_y_z = r_z @ k_y @ r_z.T
# Not dividing by n, seeing that the expectation and variance are also not divided by n and n**2, respectively.
statistic = np.sum(k_xz_z * k_y_z.T)
# Taking the sum, because due to numerical issues, the matrices might not be symmetric.
eigen_vec_k_xz_z, eigen_val_k_xz_z, _ = svd((k_xz_z + k_xz_z.T) / 2)
eigen_vec_k_y_z, eigen_val_k_y_z, _ = svd((k_y_z + k_y_z.T) / 2)
# Filter out eigenvalues that are too small.
eigen_val_k_xz_z, eigen_vec_k_xz_z = _filter_out_small_eigen_values_and_vectors(eigen_val_k_xz_z, eigen_vec_k_xz_z)
eigen_val_k_y_z, eigen_vec_k_y_z = _filter_out_small_eigen_values_and_vectors(eigen_val_k_y_z, eigen_vec_k_y_z)
if len(eigen_val_k_xz_z) == 1:
empirical_kernel_map_xz_z = eigen_vec_k_xz_z * np.sqrt(eigen_val_k_xz_z)
else:
empirical_kernel_map_xz_z = eigen_vec_k_xz_z @ (np.eye(len(eigen_val_k_xz_z)) * np.sqrt(eigen_val_k_xz_z)).T
empirical_kernel_map_xz_z = empirical_kernel_map_xz_z.squeeze()
empirical_kernel_map_xz_z = empirical_kernel_map_xz_z.reshape(empirical_kernel_map_xz_z.shape[0], -1)
if len(eigen_val_k_y_z) == 1:
empirical_kernel_map_y_z = eigen_vec_k_y_z * np.sqrt(eigen_val_k_y_z)
else:
empirical_kernel_map_y_z = eigen_vec_k_y_z @ (np.eye(len(eigen_val_k_y_z)) * np.sqrt(eigen_val_k_y_z)).T
empirical_kernel_map_y_z = empirical_kernel_map_y_z.squeeze()
empirical_kernel_map_y_z = empirical_kernel_map_y_z.reshape(empirical_kernel_map_y_z.shape[0], -1)
num_eigen_vec_xz_z = empirical_kernel_map_xz_z.shape[1]
num_eigen_vec_y_z = empirical_kernel_map_y_z.shape[1]
size_w = num_eigen_vec_xz_z * num_eigen_vec_y_z
w = (empirical_kernel_map_y_z[:, None] * empirical_kernel_map_xz_z[..., None]).reshape(
empirical_kernel_map_y_z.shape[0], -1
)
if size_w > n:
ww_prod = w @ w.T
else:
ww_prod = w.T @ w
return _estimate_p_value(ww_prod, statistic)
def _fast_centering(k: np.ndarray) -> np.ndarray:
"""Compute centered kernel matrix in time O(n^2).
The centered kernel matrix is defined as K_c = H @ K @ H, with
H = identity - 1/ n * ones(n,n). Computing H @ K @ H via matrix multiplication scales with n^3. The
implementation circumvents this and runs in time n^2.
:param k: original kernel matrix of size nxn
:return: centered kernel matrix of size nxn
"""
n = len(k)
k_c = (
k
- 1 / n * np.outer(np.ones(n), np.sum(k, axis=0))
- 1 / n * np.outer(np.sum(k, axis=1), np.ones(n))
+ 1 / n**2 * np.sum(k) * np.ones((n, n))
)
return k_c
def _hsic(
X: np.ndarray,
Y: np.ndarray,
kernel: Callable[[np.ndarray], np.ndarray],
scale_data: bool,
cut_off_value: float = EPS,
) -> float:
"""
Estimates the Hilbert-Schmidt Independence Criterion score for a pairwise independence test between variables X
and Y.
This is a reimplementation from the original Matlab code provided by the authors.
:return: The p-value for the null hypothesis that X and Y are independent.
"""
X, Y = _convert_to_numeric(*shape_into_2d(X, Y))
if X.shape[0] != Y.shape[0]:
raise RuntimeError("All variables need to have the same number of samples!")
if X.shape[0] < 6:
raise RuntimeError(
"At least 6 samples are required for the HSIC independence test. Only %d were given." % X.shape[0]
)
n = X.shape[0]
if scale_data:
X = scale(X)
Y = scale(Y)
k_mat = kernel(X)
l_mat = kernel(Y)
k_c = _fast_centering(k_mat)
l_c = _fast_centering(l_mat)
# Test statistic is given as np.trace(K @ H @ L @ H) / n. Below computes without matrix products.
test_statistic = (
1
/ n
* (
np.sum(k_mat * l_mat)
- 2 / n * np.sum(k_mat, axis=0) @ np.sum(l_mat, axis=1)
+ 1 / n**2 * np.sum(k_mat) * np.sum(l_mat)
)
)
var_hsic = (k_c * l_c) ** 2
var_hsic = (np.sum(var_hsic) - np.trace(var_hsic)) / n / (n - 1)
var_hsic = var_hsic * 2 * (n - 4) * (n - 5) / n / (n - 1) / (n - 2) / (n - 3)
k_mat = k_mat - np.diag(np.diag(k_mat))
l_mat = l_mat - np.diag(np.diag(l_mat))
bone = np.ones((n, 1), dtype=float)
mu_x = (bone.T @ k_mat @ bone) / n / (n - 1)
mu_y = (bone.T @ l_mat @ bone) / n / (n - 1)
m_hsic = (1 + mu_x * mu_y - mu_x - mu_y) / n
var_hsic = max(var_hsic.squeeze(), cut_off_value)
m_hsic = max(m_hsic.squeeze(), cut_off_value)
if test_statistic <= cut_off_value:
test_statistic = 0
al = m_hsic**2 / var_hsic
bet = var_hsic * n / m_hsic
p_value = 1 - gamma.cdf(test_statistic, al, scale=bet)
return p_value
def _filter_out_small_eigen_values_and_vectors(
eigen_values: np.ndarray, eigen_vectors: np.ndarray, relative_tolerance: float = (10**-5)
) -> Tuple[np.ndarray, np.ndarray]:
filtered_indices_xz_z = np.where(eigen_values[eigen_values > max(eigen_values) * relative_tolerance])
return eigen_values[filtered_indices_xz_z], eigen_vectors[:, filtered_indices_xz_z]
def _estimate_p_value(ww_prod: np.ndarray, statistic: np.ndarray) -> float:
# Dividing by n not required since we do not divide the test statistical_tools by n.
mean_approx = np.trace(ww_prod)
variance_approx = 2 * np.trace(ww_prod @ ww_prod)
alpha_approx = mean_approx**2 / variance_approx
beta_approx = variance_approx / mean_approx
return 1 - gamma.cdf(statistic, alpha_approx, scale=beta_approx)
def _rit(
X: np.ndarray,
Y: np.ndarray,
@ -581,7 +371,16 @@ def _estimate_column_wise_covariances(X: np.ndarray, Y: np.ndarray) -> np.ndarra
def _convert_to_numeric(*args) -> List[np.ndarray]:
return [apply_one_hot_encoding(X, fit_one_hot_encoders(X)) for X in args]
result = []
for X in args:
X = np.array(X)
for col in range(X.shape[1]):
if isinstance(X[0, col], bool):
X[:, col] = X[:, col].astype(str)
result.append(apply_one_hot_encoding(X, fit_one_hot_encoders(X)))
return result
def _remove_constant_columns(X: np.ndarray) -> np.ndarray:

Просмотреть файл

@ -2,13 +2,13 @@
future.
"""
from typing import Callable, List, Optional
from typing import Optional
import numpy as np
from sklearn.kernel_approximation import Nystroem
from sklearn.metrics import euclidean_distances
from dowhy.gcm.util.general import is_categorical, shape_into_2d
from dowhy.gcm.util.general import shape_into_2d
def apply_rbf_kernel(X: np.ndarray, precision: Optional[float] = None) -> np.ndarray:
@ -98,32 +98,6 @@ def approximate_delta_kernel_features(X: np.ndarray, num_random_components: int)
return result
def auto_create_list_of_kernels(X: np.ndarray) -> List[Callable[[np.ndarray], np.ndarray]]:
X = shape_into_2d(X)
tmp_list = []
for i in range(X.shape[1]):
if not is_categorical(X[:, i]):
tmp_list.append(apply_rbf_kernel)
else:
tmp_list.append(apply_delta_kernel)
return tmp_list
def auto_create_list_of_kernel_approximations(X: np.ndarray) -> List[Callable[[np.ndarray, int], np.ndarray]]:
X = shape_into_2d(X)
tmp_list = []
for i in range(X.shape[1]):
if not is_categorical(X[:, i]):
tmp_list.append(approximate_rbf_kernel_features)
else:
tmp_list.append(approximate_delta_kernel_features)
return tmp_list
def _median_based_precision(distances: np.ndarray) -> float:
tmp = np.sqrt(distances)
tmp = tmp - np.tril(tmp, -1)

Просмотреть файл

@ -2,11 +2,9 @@ import random
import numpy as np
import pytest
from _pytest.python_api import approx
from flaky import flaky
from dowhy.gcm.independence_test import approx_kernel_based, kernel_based
from dowhy.gcm.independence_test.kernel import _fast_centering
@flaky(max_runs=5)
@ -62,11 +60,6 @@ def test_given_random_seed_when_perform_conditional_kernel_based_test_then_retur
assert result_1 == result_2
def test_given_too_few_samples_when_perform_kernel_based_test_then_raise_error():
with pytest.raises(RuntimeError):
kernel_based(np.array([1, 2, 3, 4]), np.array([1, 3, 2, 4]))
@flaky(max_runs=5)
def test_given_continuous_independent_data_when_perform_kernel_based_test_then_not_reject():
x = np.random.randn(1000, 1)
@ -314,14 +307,6 @@ def test_given_random_seed_when_perform_pairwise_approx_kernel_based_test_then_r
assert result_1 == result_2
def test_when_using_fast_centering_then_gives_expected_results():
X = np.random.normal(0, 1, (100, 100))
h = np.identity(X.shape[0]) - np.ones((X.shape[0], X.shape[0]), dtype=float) / X.shape[0]
assert _fast_centering(X) == approx(h @ X @ h)
@flaky(max_runs=3)
def test_given_weak_dependency_when_perform_kernel_based_test_then_returns_expected_result():
X = np.random.choice(2, (10000, 100)) # Require a lot of data here for the bootstraps.