Add and remove some unit tests in gcm module

Signed-off-by: Patrick Bloebaum <bloebp@amazon.com>
This commit is contained in:
Patrick Bloebaum 2023-03-06 11:29:16 -08:00 коммит произвёл Patrick Blöbaum
Родитель 7ca6de528a
Коммит 7e370f9c64
8 изменённых файлов: 519 добавлений и 37 удалений

Просмотреть файл

@ -16,6 +16,7 @@ from dowhy.gcm import (
arrow_strength,
fit,
)
from dowhy.gcm.auto import assign_causal_mechanisms
from dowhy.gcm.divergence import estimate_kl_divergence_continuous
from dowhy.gcm.influence import arrow_strength_of_model
from dowhy.gcm.ml import create_linear_regressor, create_logistic_regression_classifier
@ -49,7 +50,7 @@ def test_given_continuous_data_with_default_attribution_func_when_estimate_arrow
@flaky(max_runs=3)
def test_given_gcm_with_misspecified_mechanism_when_evaluate_arrow_strength_with__observational_data_then_gives_expected_results():
def test_given_gcm_with_misspecified_mechanism_when_evaluate_arrow_strength_with_observational_data_then_gives_expected_results():
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X1", "X2"), ("X0", "X2")]))
# Here, we misspecified the mechanism on purpose by setting scale to 1 instead of 2.
causal_model.set_causal_mechanism("X0", ScipyDistribution(stats.norm, loc=0, scale=1))
@ -87,6 +88,99 @@ def test_given_categorical_target_node_when_estimate_arrow_strength_of_model_cla
assert arrow_strength_of_model(classifier_sem, X) == approx(np.array([0.3, 0.3, 0, 0, 0]), abs=0.1)
def test_given_fixed_random_seed_when_estimate_arrow_strength_then_return_deterministid_result(
preserve_random_generator_state,
):
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X1", "X2"), ("X0", "X2")]))
causal_model.set_causal_mechanism("X1", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X0", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X2", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
X0 = np.random.normal(0, 1, 1000)
X1 = np.random.normal(0, 1, 1000)
test_data = pd.DataFrame({"X0": X0, "X1": X1, "X2": 3 * X0 + X1 + np.random.normal(0, 0.2, X0.shape[0])})
fit(causal_model, test_data)
causal_strengths_1 = arrow_strength(causal_model, "X2", max_num_runs=5, n_jobs=-1)
causal_strengths_2 = arrow_strength(causal_model, "X2", max_num_runs=5, n_jobs=-1)
assert causal_strengths_1[("X0", "X2")] != causal_strengths_2[("X0", "X2")]
assert causal_strengths_1[("X1", "X2")] != causal_strengths_2[("X1", "X2")]
np.random.seed(0)
causal_strengths_1 = arrow_strength(causal_model, "X2", max_num_runs=5, n_jobs=-1)
np.random.seed(0)
causal_strengths_2 = arrow_strength(causal_model, "X2", max_num_runs=5, n_jobs=-1)
assert causal_strengths_1[("X0", "X2")] == causal_strengths_2[("X0", "X2")]
assert causal_strengths_1[("X1", "X2")] == causal_strengths_2[("X1", "X2")]
@flaky(max_runs=3)
def test_given_misspecified_graph_when_estimating_direct_arrow_strength_with_observed_data_then_returns_correct_result():
Z = np.random.normal(0, 1, 1000)
X0 = Z + np.random.normal(0, 1, 1000)
X1 = Z + 2 * X0 + np.random.normal(0, 1, 1000)
X2 = X0 + X1
data = pd.DataFrame({"Z": Z, "X0": X0, "X1": X1, "X2": X2})
# Missing connection between X0 and X1.
# For X0 and X1, we set the ground truth noise to further emphasize the misspecification. The inferred noise of X1
# would otherwise have a dependency with Z due to the missing connection with X0.
causal_model_without = ProbabilisticCausalModel(nx.DiGraph([("Z", "X0"), ("Z", "X1"), ("X0", "X2"), ("X1", "X2")]))
causal_model_without.set_causal_mechanism(
"X0", AdditiveNoiseModel(create_linear_regressor(), ScipyDistribution(stats.norm, loc=0, scale=1))
)
causal_model_without.set_causal_mechanism(
"X1", AdditiveNoiseModel(create_linear_regressor(), ScipyDistribution(stats.norm, loc=0, scale=1))
)
assign_causal_mechanisms(causal_model_without, data)
fit(causal_model_without, data)
# Modelling connection between X0 and X1 explicitly.
causal_model_with = ProbabilisticCausalModel(
nx.DiGraph([("Z", "X0"), ("Z", "X1"), ("X0", "X1"), ("X0", "X2"), ("X1", "X2")])
)
causal_model_with.set_causal_mechanism(
"X0", AdditiveNoiseModel(create_linear_regressor(), ScipyDistribution(stats.norm, loc=0, scale=1))
)
causal_model_with.set_causal_mechanism(
"X1", AdditiveNoiseModel(create_linear_regressor(), ScipyDistribution(stats.norm, loc=0, scale=1))
)
assign_causal_mechanisms(causal_model_with, data, override_models=False)
fit(causal_model_with, data)
strength_missing_edge = arrow_strength(causal_model_without, "X2", parent_samples=data)
strength_with_edge = arrow_strength(causal_model_with, "X2")
assert strength_missing_edge[("X0", "X2")] == approx(strength_with_edge[("X0", "X2")], abs=0.2)
assert strength_missing_edge[("X1", "X2")] == approx(strength_with_edge[("X1", "X2")], abs=1)
@flaky(max_runs=3)
def test_given_gcm_with_misspecified_mechanism_when_evaluate_arrow_strength_with_observational_data_then_gives_expected_results():
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X1", "X2"), ("X0", "X2")]))
# Here, we misspecify the mechanism on purpose by setting scale to 1 instead of 2.
causal_model.set_causal_mechanism("X0", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X1", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X2", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
X0 = np.random.normal(0, 2, 2000) # The standard deviation in the data is actually 2.
X1 = np.random.normal(0, 1, 2000)
test_data = pd.DataFrame({"X0": X0, "X1": X1, "X2": X0 + X1 + np.random.normal(0, 0.2, X0.shape[0])})
fit(causal_model, test_data)
# If we provide the observational data here, we can mitigate the misspecification of the causal mechanism.
causal_strengths = arrow_strength(
causal_model, "X2", parent_samples=test_data, difference_estimation_func=lambda x, y: np.var(y) - np.var(x)
)
assert causal_strengths[("X0", "X2")] == approx(4, abs=0.5)
assert causal_strengths[("X1", "X2")] == approx(1, abs=0.1)
def _create_causal_model():
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X1", "X2"), ("X0", "X2")]))
causal_model.set_causal_mechanism("X1", ScipyDistribution(stats.norm, loc=0, scale=1))

Просмотреть файл

@ -1,6 +1,7 @@
import networkx as nx
import numpy as np
import pandas as pd
from _pytest.python_api import approx
from flaky import flaky
from pytest import mark
from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor
@ -8,8 +9,8 @@ from sklearn.linear_model import ElasticNetCV, LassoCV, LinearRegression, Logist
from sklearn.naive_bayes import GaussianNB
from sklearn.pipeline import Pipeline
from dowhy.gcm import ProbabilisticCausalModel
from dowhy.gcm.auto import AssignmentQuality, assign_causal_mechanisms
from dowhy.gcm import ProbabilisticCausalModel, draw_samples, fit
from dowhy.gcm.auto import AssignmentQuality, assign_causal_mechanisms, has_linear_relationship
def _generate_linear_regression_data(num_samples=1000):
@ -219,3 +220,92 @@ def test_when_using_best_quality_then_returns_auto_gluon_model():
causal_model, pd.DataFrame({"X": [1], "Y": ["Class 1"]}), quality=AssignmentQuality.BEST, override_models=True
)
assert isinstance(causal_model.causal_mechanism("Y").classifier_model, AutoGluonClassifier)
@flaky(max_runs=3)
def test_given_linear_gaussian_data_when_fit_scm_with_auto_assigned_models_with_default_parameters_then_generate_samples_with_correct_statistics():
X0 = np.random.normal(0, 1, 2000)
X1 = 2 * X0 + np.random.normal(0, 0.2, 2000)
X2 = 0.5 * X0 + np.random.normal(0, 0.2, 2000)
X3 = 0.5 * X2 + np.random.normal(0, 0.2, 2000)
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
assign_causal_mechanisms(causal_model, original_observations)
fit(causal_model, original_observations)
generated_samples = draw_samples(causal_model, 2000)
assert np.mean(generated_samples["X0"]) == approx(np.mean(X0), abs=0.1)
assert np.std(generated_samples["X0"]) == approx(np.std(X0), abs=0.1)
assert np.mean(generated_samples["X1"]) == approx(np.mean(X1), abs=0.1)
assert np.std(generated_samples["X1"]) == approx(np.std(X1), abs=0.1)
assert np.mean(generated_samples["X2"]) == approx(np.mean(X2), abs=0.1)
assert np.std(generated_samples["X2"]) == approx(np.std(X2), abs=0.1)
assert np.mean(generated_samples["X3"]) == approx(np.mean(X3), abs=0.1)
assert np.std(generated_samples["X3"]) == approx(np.std(X3), abs=0.1)
@flaky(max_runs=3)
def test_given_nonlinear_gaussian_data_when_fit_scm_with_auto_assigned_models_with_default_parameters_then_generate_samples_with_correct_statistics():
X0 = np.random.normal(0, 1, 2000)
X1 = np.sin(2 * X0) + np.random.normal(0, 0.2, 2000)
X2 = 0.5 * X0**2 + np.random.normal(0, 0.2, 2000)
X3 = 0.5 * X2 + np.random.normal(0, 0.2, 2000)
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
assign_causal_mechanisms(causal_model, original_observations)
fit(causal_model, original_observations)
generated_samples = draw_samples(causal_model, 2000)
assert np.mean(generated_samples["X0"]) == approx(np.mean(X0), abs=0.1)
assert np.std(generated_samples["X0"]) == approx(np.std(X0), abs=0.1)
assert np.mean(generated_samples["X1"]) == approx(np.mean(X1), abs=0.1)
assert np.std(generated_samples["X1"]) == approx(np.std(X1), abs=0.1)
assert np.mean(generated_samples["X2"]) == approx(np.mean(X2), abs=0.1)
assert np.std(generated_samples["X2"]) == approx(np.std(X2), abs=0.1)
assert np.mean(generated_samples["X3"]) == approx(np.mean(X3), abs=0.1)
assert np.std(generated_samples["X3"]) == approx(np.std(X3), abs=0.1)
def test_givne_simple_data_when_apply_has_linear_relationship_then_returns_expected_results():
X = np.random.random(1000)
assert has_linear_relationship(X, 2 * X)
assert not has_linear_relationship(X, X**2)
@flaky(max_runs=3)
def test_given_categorical_data_when_calling_has_linear_relationship_then_returns_correct_results():
X1 = np.random.normal(0, 1, 1000)
X2 = np.random.normal(0, 1, 1000)
assert has_linear_relationship(np.column_stack([X1, X2]), (X1 + X2 > 0).astype(str))
assert not has_linear_relationship(np.column_stack([X1, X2]), (X1 * X2 > 0).astype(str))
def test_given_imbalanced_categorical_data_when_calling_has_linear_relationship_then_does_not_raise_exception():
X = np.random.normal(0, 1, 1000)
Y = np.array(["OneClass"] * 1000)
assert has_linear_relationship(np.append(X, 0), np.append(Y, "RareClass"))
X = np.random.normal(0, 1, 100000)
Y = np.array(["OneClass"] * 100000)
assert has_linear_relationship(
np.append(X, np.random.normal(0, 0.000001, 100)), np.append(Y, np.array(["RareClass"] * 100))
)
def test_given_data_with_rare_categorical_features_when_calling_has_linear_relationship_then_does_not_raise_exception():
X = np.array(["Feature" + str(i) for i in range(20)])
Y = np.append(np.array(["Class1"] * 10), np.array(["Class2"] * 10))
assert has_linear_relationship(X, Y)

Просмотреть файл

@ -13,6 +13,7 @@ from dowhy.gcm import (
fit,
)
from dowhy.gcm.auto import AssignmentQuality
from dowhy.gcm.distribution_change import mechanism_change_test
from dowhy.gcm.ml import create_linear_regressor
from dowhy.gcm.shapley import ShapleyConfig
@ -152,3 +153,36 @@ def _generate_data():
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
return original_observations, outlier_observations
@flaky(max_runs=3)
def test_given_data_where_mechanism_changed_when_apply_mechanism_change_test_then_returns_correct_p_values():
X0_org = np.random.uniform(-1, 1, 500)
X1_org = 0.5 * X0_org + np.random.normal(0, 0.1, 500)
X0_new = np.random.uniform(-1, 1, 500)
X1_new = 2 * X0_new + np.random.normal(0, 0.1, 500)
assert mechanism_change_test(X1_org, X1_new, X0_org, X0_new) <= 0.05
assert mechanism_change_test(X1_org, X1_org, X0_org, X0_org) > 0.05
@flaky(max_runs=3)
def test_given_data_where_root_node_changed_when_apply_mechanism_change_test_then_returns_correct_p_values():
X0_org = np.random.uniform(-1, 1, 500)
X0_new = np.random.uniform(-2, 2, 500)
assert mechanism_change_test(X0_org, X0_new) <= 0.05
assert mechanism_change_test(X0_org, X0_org) > 0.05
@flaky(max_runs=3)
def test_given_data_where_noise_changed_when_apply_mechanism_change_test_then_returns_correct_p_values():
X0_org = np.random.uniform(-1, 1, 500)
X1_org = 2 * X0_org + np.random.normal(0, 0.1, 500)
X0_new = np.random.uniform(-1, 1, 500)
X1_new = 2 * X0_new + np.random.normal(0, 1, 500)
assert mechanism_change_test(X1_org, X1_new, X0_org, X0_new) <= 0.05
assert mechanism_change_test(X1_org, X1_org, X0_org, X0_org) > 0.05

Просмотреть файл

@ -7,6 +7,7 @@ from dowhy.gcm.divergence import (
estimate_kl_divergence_categorical,
estimate_kl_divergence_continuous,
estimate_kl_divergence_of_probabilities,
is_probability_matrix,
)
@ -60,3 +61,10 @@ def test_given_probability_vectors_when_auto_estimate_kl_divergence_then_correct
np.array([[0.25, 0.5, 0.125, 0.125], [0.5, 0.25, 0.125, 0.125]]),
np.array([[0.5, 0.25, 0.125, 0.125], [0.25, 0.5, 0.125, 0.125]]),
) == approx(0.25 * np.log(0.25 / 0.5) + 0.5 * np.log(0.5 / 0.25), abs=0.01)
def test_given_valid_and_invalid_probability_vectors_when_apply_is_probabilities_then_return_expected_results():
assert is_probability_matrix(np.array([0.5, 0.3, 0.2]))
assert not is_probability_matrix(np.array([0.1, 0.3, 0.2]))
assert is_probability_matrix(np.array([[0.5, 0.3, 0.2], [0.1, 0.2, 0.7]]))
assert not is_probability_matrix(np.random.normal(0, 1, (5, 3)))

Просмотреть файл

@ -82,38 +82,8 @@ def test_when_using_parent_relevance_with_categorical_data_then_returns_correct_
assert noise == approx(0, abs=0.05)
@flaky(max_runs=5)
def test_when_using_parent_relevance_with_confidence_intervals_then_returns_reasonable_bounds():
causal_model = StructuralCausalModel(nx.DiGraph([("X1", "X2"), ("X0", "X2")]))
causal_model.set_causal_mechanism("X1", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X0", ScipyDistribution(stats.norm, loc=0, scale=1))
causal_model.set_causal_mechanism("X2", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
X0 = np.random.normal(0, 1, 1000)
X1 = np.random.normal(0, 1, 1000)
training_data = pd.DataFrame({"X0": X0, "X1": X1, "X2": 3 * X0 + X1})
fit(causal_model, training_data)
def estimation_func():
dict_result, noise = parent_relevance(causal_model, "X2")
dict_result[("noise", "X2")] = noise
return dict_result
median_relevance, cis = confidence_intervals(estimation_func, num_bootstrap_resamples=10)
# Contributions should add up to Var(X2)
assert median_relevance[("X0", "X2")] == approx(9, abs=1)
assert median_relevance[("X1", "X2")] == approx(1, abs=0.3)
assert median_relevance[("noise", "X2")] == approx(0, abs=0.5)
assert cis[("X0", "X2")] == approx(np.array([8.5, 9.5]), abs=1)
assert cis[("X1", "X2")] == approx(np.array([0.8, 1.2]), abs=0.4)
assert cis[("noise", "X2")] == approx(np.array([-0.2, 0.2]), abs=0.4)
@flaky(max_runs=5)
def test_feature_relevance_sample_mean_diff():
@flaky(max_runs=3)
def test_when_given_linear_data_when_estimate_feature_relevance_per_sample_with_mean_diff_then_returns_expected_values():
num_vars = 15
X = np.random.normal(0, 1, (1000, num_vars))
coefficients = np.random.choice(20, num_vars) - 10
@ -171,7 +141,7 @@ def test_given_baseline_values_when_estimating_feature_relevance_sample_with_mea
@flaky(max_runs=5)
def test_feature_relevance_sample_mean_diff_with_certain_batch_size():
def test_given_specific_batch_size_when_estimate_feature_relevance_per_sample_then_returns_expected_results():
X = np.random.normal(0, 1, (1000, 3))
coefficients = np.random.choice(20, 3) - 10

Просмотреть файл

@ -55,3 +55,15 @@ def test_given_a_directed_graph_when_checking_if_a_node_is_root_then_returns_tru
assert is_root_node(graph, "X") == True
assert is_root_node(graph, "Y") == True
assert is_root_node(graph, "Z") == False
def test_when_set_and_get_causal_model_then_the_set_model_is_returned():
causal_dag = nx.DiGraph()
causal_dag.add_node("X0")
causal_model = ProbabilisticCausalModel(causal_dag)
mdl = EmpiricalDistribution()
causal_model.set_causal_mechanism("X0", mdl)
assert causal_model.causal_mechanism("X0") == mdl

228
tests/gcm/test_noise.py Normal file
Просмотреть файл

@ -0,0 +1,228 @@
import networkx as nx
import numpy as np
import pandas as pd
from _pytest.python_api import approx
from flaky import flaky
from dowhy.gcm import (
AdditiveNoiseModel,
DirectedGraph,
EmpiricalDistribution,
InvertibleStructuralCausalModel,
StructuralCausalModel,
fit,
)
from dowhy.gcm._noise import compute_data_from_noise, compute_noise_from_data, get_noise_dependent_function
from dowhy.gcm.auto import assign_causal_mechanisms
from dowhy.gcm.graph import PARENTS_DURING_FIT, get_ordered_predecessors
from dowhy.gcm.ml import (
create_linear_regressor,
create_linear_regressor_with_given_parameters,
create_logistic_regression_classifier,
)
def test_given_data_with_known_noise_values_when_compute_data_from_noise_then_returns_correct_values():
N0 = np.random.uniform(-1, 1, 1000)
N1 = np.random.normal(0, 0.1, 1000)
N2 = np.random.normal(0, 0.1, 1000)
N3 = np.random.normal(0, 0.1, 1000)
X0 = N0
X1 = 2 * X0 + N1
X2 = 0.5 * X0 + N2
X3 = 0.5 * X2 + N3
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
noise_observations = pd.DataFrame({"X0": N0, "X1": N1, "X2": N2, "X3": N3})
causal_model = StructuralCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
causal_model.set_causal_mechanism("X0", EmpiricalDistribution())
causal_model.set_causal_mechanism(
"X1", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([2])))
)
causal_model.set_causal_mechanism(
"X2", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([0.5])))
)
causal_model.set_causal_mechanism(
"X3", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([0.5])))
)
_persist_parents(causal_model.graph)
estimated_samples = compute_data_from_noise(causal_model, noise_observations)
for node in original_observations:
assert estimated_samples[node].to_numpy() == approx(original_observations[node].to_numpy())
def test_given_data_with_known_noise_values_when_compute_noise_from_data_then_reconstruct_correct_noise_values():
N0 = np.random.uniform(-1, 1, 1000)
N1 = np.random.normal(0, 0.1, 1000)
N2 = np.random.normal(0, 0.1, 1000)
N3 = np.random.normal(0, 0.1, 1000)
X0 = N0
X1 = 2 * X0 + N1
X2 = 0.5 * X0 + N2
X3 = 0.5 * X2 + N3
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
causal_model = InvertibleStructuralCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
causal_model.set_causal_mechanism("X0", EmpiricalDistribution())
causal_model.set_causal_mechanism(
"X1", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([2])))
)
causal_model.set_causal_mechanism(
"X2", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([0.5])))
)
causal_model.set_causal_mechanism(
"X3", AdditiveNoiseModel(prediction_model=create_linear_regressor_with_given_parameters(np.array([0.5])))
)
_persist_parents(causal_model.graph)
estimated_noise_samples = compute_noise_from_data(causal_model, original_observations)
assert estimated_noise_samples["X0"].to_numpy() == approx(N0)
assert estimated_noise_samples["X1"].to_numpy() == approx(N1)
assert estimated_noise_samples["X2"].to_numpy() == approx(N2)
assert estimated_noise_samples["X3"].to_numpy() == approx(N3)
@flaky(max_runs=3)
def test_given_continuous_variables_when_get_noise_dependent_function_then_represents_correct_function():
X0 = np.random.normal(0, 1, 2000)
X1 = X0 + np.random.normal(0, 0.1, 2000)
X2 = 0.5 * X0 + np.random.normal(0, 0.1, 2000)
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 2000)
data = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
causal_model = StructuralCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
assign_causal_mechanisms(causal_model, data)
fit(causal_model, data)
fn, parent_order = get_noise_dependent_function(causal_model, "X3")
input_data = pd.DataFrame(np.array([[0, 0, 0], [0, 0, 2], [1, 0, 0], [1, 2, 0]]), columns=["X0", "X2", "X3"])
assert set(parent_order) == {"X0", "X2", "X3"}
assert fn(input_data.to_numpy()) == approx(np.array([0, 2, 0.25, 1.25]), abs=0.1)
fn, _ = get_noise_dependent_function(causal_model, "X3", approx_prediction_model=create_linear_regressor())
assert fn(input_data.to_numpy()).reshape(-1) == approx(np.array([0, 2, 0.25, 1.25]), abs=0.1)
@flaky(max_runs=3)
def test_given_continuous_and_categorical_variables_when_get_noise_dependent_function_then_represents_correct_function():
causal_model = StructuralCausalModel(nx.DiGraph([("X0", "X2"), ("X1", "X2"), ("X2", "X3")]))
X0 = np.random.normal(0, 1, 1000)
X1 = np.random.choice(2, 1000).astype(str)
X2 = []
for (x0, x1) in zip(X0, X1):
if x1 == "0":
x = np.random.normal(0, 1)
else:
x = np.random.normal(1, 1)
if x < 0.5:
X2.append(x0 + 2 > 0)
else:
X2.append(x0 - 2 > 0)
X2 = np.array(X2).astype(str)
X3 = []
for x2 in X2:
if x2 == "True":
x = np.random.normal(0, 1)
else:
x = np.random.normal(1, 1)
if x < 0.5:
X3.append("False")
else:
X3.append("True")
X3 = np.array(X3).astype(str)
data = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
assign_causal_mechanisms(causal_model, data)
fit(causal_model, data)
fn, parent_order = get_noise_dependent_function(causal_model, "X3")
assert sorted(parent_order[:2]) == ["X0", "X1"]
assert parent_order[2:] == ["X2", "X3"]
assert np.all(
fn(
pd.DataFrame({"X0": [0, 0, 0, 0], "X1": ["0", "0", "0", "0"], "X2": [0, 0, 0, 1], "X3": [1, 0, 0.6, 0.6]})[
parent_order
].to_numpy()
)
== np.array(["True", "False", "True", "False"])
)
fn, parent_order = get_noise_dependent_function(
causal_model, "X3", approx_prediction_model=create_logistic_regression_classifier()
)
assert np.all(
fn(
pd.DataFrame({"X0": [0, 0, 0, 0], "X1": ["0", "0", "0", "0"], "X2": [0, 0, 0, 1], "X3": [1, 0, 0.6, 0.6]})[
parent_order
].to_numpy()
).reshape(-1)
== np.array(["True", "False", "True", "False"])
)
def test_when_get_noise_dependent_function_then_correctly_omits_nodes():
# Just some random data, since we are only interested in the omitted data.
data = pd.DataFrame(
{
"X0": np.random.normal(0, 1, 10),
"X1": np.random.normal(0, 1, 10),
"X2": np.random.normal(0, 1, 10),
"X3": np.random.normal(0, 1, 10),
"X4": np.random.normal(0, 1, 10),
"X5": np.random.normal(0, 1, 10),
"X6": np.random.normal(0, 1, 10),
"X7": np.random.normal(0, 1, 10),
}
)
causal_model = StructuralCausalModel(
nx.DiGraph([("X0", "X1"), ("X1", "X2"), ("X3", "X2"), ("X4", "X5"), ("X6", "X5")])
)
causal_model.graph.add_node("X7")
assign_causal_mechanisms(causal_model, data)
fit(causal_model, data)
_, parent_order = get_noise_dependent_function(causal_model, "X2")
assert set(parent_order) == {"X0", "X1", "X2", "X3"}
assert parent_order.index("X1") > parent_order.index("X0")
assert parent_order.index("X2") > parent_order.index("X0")
assert parent_order.index("X2") > parent_order.index("X1")
assert parent_order.index("X2") > parent_order.index("X3")
def test_given_nodes_names_are_ints_when_calling_noise_dependent_function_then_does_not_raise_key_error_exception():
causal_model = StructuralCausalModel(nx.DiGraph([(1, 2)]))
data = pd.DataFrame({1: np.random.normal(0, 1, 10), 2: np.random.normal(0, 1, 10)})
assign_causal_mechanisms(causal_model, data)
fit(causal_model, data)
noise_dependent_function, _ = get_noise_dependent_function(causal_model, 1)
noise_dependent_function(np.array([[1]]))
def _persist_parents(graph: DirectedGraph):
for node in graph.nodes:
graph.nodes[node][PARENTS_DURING_FIT] = get_ordered_predecessors(graph, node)

Просмотреть файл

@ -1,8 +1,27 @@
import random
import numpy as np
import pandas as pd
import pytest
from _pytest.python_api import approx
from dowhy.gcm.util.general import apply_one_hot_encoding, fit_one_hot_encoders, has_categorical, is_categorical
from dowhy.gcm.util.general import (
apply_one_hot_encoding,
fit_one_hot_encoders,
has_categorical,
is_categorical,
set_random_seed,
shape_into_2d,
)
@pytest.fixture
def preserve_random_generator_state():
numpy_state = np.random.get_state()
random_state = random.getstate()
yield
np.random.set_state(numpy_state)
random.setstate(random_state)
def test_given_categorical_data_when_evaluating_is_categorical_then_returns_expected_result():
@ -35,3 +54,30 @@ def test_given_unknown_categorical_input_when_apply_one_hot_encoders_then_does_n
np.array([["a", 4, "f"]]),
fit_one_hot_encoders(np.array([["d", 1, "a"], ["b", 2, "d"], ["a", 3, "a"]], dtype=object)),
) == approx(np.array([[1, 0, 0, 4, 0, 0]]))
def test_when_apply_shape_into_2d_then_returns_correct_shape():
assert shape_into_2d(np.array(1)) == np.array([[1]])
assert np.all(shape_into_2d(np.array([1, 2, 3, 4])) == np.array([[1], [2], [3], [4]]))
assert np.all(shape_into_2d(np.array([[1], [2], [3], [4]])) == np.array([[1], [2], [3], [4]]))
assert np.all(
shape_into_2d(np.array([[1, 2], [1, 2], [1, 2], [1, 2]])) == np.array([[1, 2], [1, 2], [1, 2], [1, 2]])
)
def test_given_3d_input_when_apply_shape_into_2d_then_raises_error_if_3d():
with pytest.raises(ValueError):
shape_into_2d(np.array([[[1], [2]], [[3], [4]]]))
def test_when_set_random_seed_then_expect_same_random_values(preserve_random_generator_state):
set_random_seed(0)
numpy_vals1 = np.random.random(10)
random_vals1 = [random.randint(0, 100) for i in range(10)]
set_random_seed(0)
numpy_vals2 = np.random.random(10)
random_vals2 = [random.randint(0, 100) for i in range(10)]
assert numpy_vals1 == approx(numpy_vals2)
assert random_vals1 == approx(random_vals2)