Add an API for unit change attribution
Signed-off-by: Kailash <111277+kailashbuki@users.noreply.github.com>
This commit is contained in:
Родитель
22211f09bb
Коммит
8c8552580e
|
@ -0,0 +1,108 @@
|
|||
"""This module provides the APIs for attributing the change in the output value of a deterministic mechanism for a statistical unit.
|
||||
"""
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from sklearn.linear_model._base import LinearModel
|
||||
from sklearn.utils.validation import check_is_fitted
|
||||
|
||||
from dowhy.gcm.fcms import PredictionModel
|
||||
from dowhy.gcm.ml.regression import SklearnRegressionModel
|
||||
from dowhy.gcm.shapley import ShapleyConfig, estimate_shapley_values
|
||||
|
||||
|
||||
class LinearPredictionModel:
|
||||
@property
|
||||
@abstractmethod
|
||||
def coefficients(self) -> np.ndarray:
|
||||
pass
|
||||
|
||||
|
||||
class SklearnLinearRegressionModel(SklearnRegressionModel, LinearPredictionModel):
|
||||
def __init__(self, sklearn_mdl: LinearModel) -> None:
|
||||
super(SklearnLinearRegressionModel, self).__init__(sklearn_mdl)
|
||||
|
||||
@property
|
||||
def coefficients(self) -> np.ndarray:
|
||||
check_is_fitted(self.sklearn_model)
|
||||
return self.sklearn_model.coef_
|
||||
|
||||
|
||||
def unit_change_nonlinear(
|
||||
background_mechanism: PredictionModel,
|
||||
background_df: pd.DataFrame,
|
||||
foreground_mechanism: PredictionModel,
|
||||
foreground_df: pd.DataFrame,
|
||||
input_column_names: List[str],
|
||||
shapley_config: Optional[ShapleyConfig] = None,
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Calculates the contributions of mechanism and each input to the change in the output values of a non-linear deterministic mechanism.
|
||||
The technical method is described in the following research paper:
|
||||
Kailash Budhathoki, George Michailidis, Dominik Janzing. *Explaining the root causes of unit-level changes*. arXiv, 2022.
|
||||
|
||||
:param background_mechanism: The background mechanism.
|
||||
:param background_df: The background data.
|
||||
:param foreground_mechanism: The foreground mechanism.
|
||||
:param foreground_df: The foreground data.
|
||||
:param input_column_names: The names of the input (features) columns in both dataframes.
|
||||
:param shapley_config: The configuration for calculating Shapley values.
|
||||
:return: A pandas dataframe with attributions to each cause for the change in each output row of provided dataframes.
|
||||
"""
|
||||
_check_if_input_columns_exist(background_df, foreground_df, input_column_names)
|
||||
|
||||
def payoff(binary_vector: List[int]) -> np.ndarray:
|
||||
"""The last cell in the binary vector represents the player 'mechanism'."""
|
||||
background_column_names = [input_column_names[i] for i, val in enumerate(binary_vector[:-1]) if val == 0]
|
||||
foreground_column_names = [input_column_names[i] for i, val in enumerate(binary_vector[:-1]) if val == 1]
|
||||
df = pd.concat([background_df[background_column_names], foreground_df[foreground_column_names]], axis=1)
|
||||
mechanism = foreground_mechanism if binary_vector[-1] else background_mechanism
|
||||
return mechanism.predict(df[input_column_names].values).flatten()
|
||||
|
||||
contributions = estimate_shapley_values(payoff, len(input_column_names) + 1, shapley_config)
|
||||
root_causes = input_column_names + ["f"]
|
||||
return pd.DataFrame(contributions, columns=root_causes)
|
||||
|
||||
|
||||
def unit_change_linear(
|
||||
background_mechanism: LinearPredictionModel,
|
||||
background_df: pd.DataFrame,
|
||||
foreground_mechanism: LinearPredictionModel,
|
||||
foreground_df: pd.DataFrame,
|
||||
input_column_names: List[str],
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Calculates the contributions of mechanism and each input to the change in the output values of a linear deterministic mechanism.
|
||||
|
||||
:param background_mechanism: The linear background mechanism.
|
||||
:param background_df: The background data.
|
||||
:param foreground_mechanism: The linear foreground mechanism.
|
||||
:param foreground_df: The foreground data.
|
||||
:param input_column_names: The names of the input columns in both dataframes.
|
||||
:return: A pandas dataframe with attributions to each cause for the change in each output row of provided dataframes.
|
||||
"""
|
||||
_check_if_input_columns_exist(background_df, foreground_df, input_column_names)
|
||||
|
||||
coeffs_total = background_mechanism.coefficients + foreground_mechanism.coefficients # p x 1
|
||||
coeffs_diff = foreground_mechanism.coefficients - background_mechanism.coefficients # p x 1
|
||||
|
||||
input_total = foreground_df[input_column_names].to_numpy() + background_df[input_column_names].to_numpy() # n x p
|
||||
input_diff = foreground_df[input_column_names].to_numpy() - background_df[input_column_names].to_numpy() # n x p
|
||||
|
||||
contribution_input = 0.5 * np.einsum("ij,ki->ki", coeffs_total.reshape(-1, 1), input_diff)
|
||||
contribution_mechanism = 0.5 * np.einsum("ij,ki->k", coeffs_diff.reshape(-1, 1), input_total)
|
||||
contribution_df = pd.DataFrame(contribution_input, columns=input_column_names)
|
||||
contribution_df["f"] = contribution_mechanism # TODO: Handle the case where 'f' is an input column name
|
||||
return contribution_df
|
||||
|
||||
|
||||
def _check_if_input_columns_exist(
|
||||
background_df: pd.DataFrame, foreground_df: pd.DataFrame, input_column_names: List[str]
|
||||
) -> None:
|
||||
if not len(set(background_df.columns).intersection(input_column_names)) == len(input_column_names) or not len(
|
||||
set(foreground_df.columns).intersection(input_column_names)
|
||||
) == len(input_column_names):
|
||||
raise ValueError("Input column names not found in either the background or the foreground data.")
|
|
@ -0,0 +1,114 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from flaky import flaky
|
||||
from sklearn.ensemble import RandomForestRegressor as RFR
|
||||
from sklearn.exceptions import NotFittedError
|
||||
from sklearn.linear_model import LinearRegression
|
||||
|
||||
from dowhy.gcm.ml.regression import SklearnRegressionModel
|
||||
from dowhy.gcm.unit_change import SklearnLinearRegressionModel, unit_change_linear, unit_change_nonlinear
|
||||
|
||||
|
||||
@flaky(max_runs=5)
|
||||
def test_given_fitted_linear_mechanisms_with_output_change_when_evaluate_unit_change_linear_method_then_returns_correct_attributions():
|
||||
num_rows = 100
|
||||
A1 = np.random.normal(size=num_rows)
|
||||
B1 = np.random.normal(size=num_rows)
|
||||
C1 = 2 * A1 + 3 * B1
|
||||
|
||||
A2 = np.random.normal(size=num_rows)
|
||||
B2 = np.random.normal(size=num_rows)
|
||||
C2 = 3 * A2 + 2 * B2
|
||||
|
||||
background_df = pd.DataFrame(data=dict(A=A1, B=B1))
|
||||
foreground_df = pd.DataFrame(data=dict(A=A2, B=B2))
|
||||
|
||||
background_mechanism = SklearnLinearRegressionModel(
|
||||
LinearRegression(fit_intercept=False).fit(np.column_stack((A1, B1)), C1)
|
||||
)
|
||||
foreground_mechanism = SklearnLinearRegressionModel(
|
||||
LinearRegression(fit_intercept=False).fit(np.column_stack((A2, B2)), C2)
|
||||
)
|
||||
|
||||
actual_contributions = unit_change_linear(
|
||||
background_mechanism, background_df, foreground_mechanism, foreground_df, ["A", "B"]
|
||||
)
|
||||
expected_contributions = pd.DataFrame(
|
||||
data=dict(
|
||||
A=(3 + 2) * (A2 - A1) / 2, B=(2 + 3) * (B2 - B1) / 2, f=(A1 + A2) * (3 - 2) / 2 + (B1 + B2) * (2 - 3) / 2
|
||||
)
|
||||
)
|
||||
|
||||
np.testing.assert_array_almost_equal(actual_contributions.to_numpy(), expected_contributions.to_numpy(), decimal=1)
|
||||
|
||||
|
||||
@flaky(max_runs=5)
|
||||
def test_given_fitted_linear_mechanisms_with_output_change_when_evaluate_unit_change_linear_and_nonlinear_methods_then_attributions_are_consistent():
|
||||
num_rows = 100
|
||||
A1 = np.random.normal(size=num_rows)
|
||||
B1 = np.random.normal(size=num_rows)
|
||||
C1 = 2 * A1 + 3 * B1
|
||||
|
||||
A2 = np.random.normal(size=num_rows)
|
||||
B2 = np.random.normal(size=num_rows)
|
||||
C2 = 3 * A2 + 2 * B2
|
||||
|
||||
background_df = pd.DataFrame(data=dict(A=A1, B=B1))
|
||||
foreground_df = pd.DataFrame(data=dict(A=A2, B=B2))
|
||||
|
||||
background_mechanism = SklearnLinearRegressionModel(
|
||||
LinearRegression(fit_intercept=False).fit(np.column_stack((A1, B1)), C1)
|
||||
)
|
||||
foreground_mechanism = SklearnLinearRegressionModel(
|
||||
LinearRegression(fit_intercept=False).fit(np.column_stack((A2, B2)), C2)
|
||||
)
|
||||
|
||||
actual_contributions = unit_change_linear(
|
||||
background_mechanism, background_df, foreground_mechanism, foreground_df, ["A", "B"]
|
||||
)
|
||||
expected_contributions = unit_change_nonlinear(
|
||||
background_mechanism, background_df, foreground_mechanism, foreground_df, ["A", "B"]
|
||||
)
|
||||
|
||||
np.testing.assert_array_almost_equal(actual_contributions.to_numpy(), expected_contributions.to_numpy(), decimal=1)
|
||||
|
||||
|
||||
def test_given_unfitted_mechanisms_when_evaluate_unit_change_methods_then_raises_exception():
|
||||
with pytest.raises(NotFittedError):
|
||||
unit_change_linear(
|
||||
SklearnLinearRegressionModel(LinearRegression()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
SklearnLinearRegressionModel(LinearRegression()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
["A", "B"],
|
||||
)
|
||||
|
||||
with pytest.raises(NotFittedError):
|
||||
unit_change_nonlinear(
|
||||
SklearnRegressionModel(LinearRegression()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
SklearnRegressionModel(LinearRegression()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
["A", "B"],
|
||||
)
|
||||
|
||||
with pytest.raises(NotFittedError):
|
||||
unit_change_nonlinear(
|
||||
SklearnRegressionModel(RFR()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
SklearnRegressionModel(RFR()),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
["A", "B"],
|
||||
)
|
||||
|
||||
|
||||
def test_given_fitted_nonlinnear_mechanisms_when_evaluate_unit_change_linear_method_then_raises_exception():
|
||||
with pytest.raises(AttributeError):
|
||||
unit_change_linear(
|
||||
SklearnRegressionModel(RFR().fit(np.random.normal(size=(100, 2)), np.random.normal(size=100))),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
SklearnRegressionModel(RFR().fit(np.random.normal(size=(100, 2)), np.random.normal(size=100))),
|
||||
pd.DataFrame(data=dict(A=np.random.normal(size=100), B=np.random.normal(size=100))),
|
||||
["A", "B"],
|
||||
)
|
Загрузка…
Ссылка в новой задаче