342 строки
12 KiB
Python
342 строки
12 KiB
Python
import operator
|
|
|
|
import networkx as nx
|
|
import numpy as np
|
|
import pandas as pd
|
|
from flaky import flaky
|
|
from pytest import approx
|
|
|
|
from dowhy.gcm import (
|
|
AdditiveNoiseModel,
|
|
EmpiricalDistribution,
|
|
InverseDensityScorer,
|
|
ITAnomalyScorer,
|
|
MeanDeviationScorer,
|
|
MedianCDFQuantileScorer,
|
|
MedianDeviationScorer,
|
|
ProbabilisticCausalModel,
|
|
RescaledMedianCDFQuantileScorer,
|
|
anomaly_scores,
|
|
auto,
|
|
fit,
|
|
)
|
|
from dowhy.gcm.anomaly import conditional_anomaly_scores
|
|
from dowhy.gcm.constant import EPS
|
|
from dowhy.gcm.distribution_change import estimate_distribution_change_scores
|
|
from dowhy.gcm.ml import create_linear_regressor
|
|
|
|
|
|
def test_given_outlier_observation_when_estimate_anomaly_scores_using_median_cdf_quantile_scorer_then_returns_expected_result():
|
|
pcm, outlier_observation = _create_pcm_and_outlier_observation()
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(pcm, outlier_observation, anomaly_scorer_factory=MedianCDFQuantileScorer).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(
|
|
pcm, outlier_observation, anomaly_scorer_factory=lambda: ITAnomalyScorer(MedianCDFQuantileScorer())
|
|
).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
|
|
def test_given_outlier_observation_when_estimate_anomaly_scores_using_rescaled_median_cdf_quantile_scorer_then_returns_expected_result():
|
|
pcm, outlier_observation = _create_pcm_and_outlier_observation()
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(pcm, outlier_observation, anomaly_scorer_factory=RescaledMedianCDFQuantileScorer).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(
|
|
pcm,
|
|
outlier_observation,
|
|
anomaly_scorer_factory=lambda: ITAnomalyScorer(RescaledMedianCDFQuantileScorer()),
|
|
).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
|
|
def test_given_outlier_observation_when_estimate_anomaly_scores_using_mean_deviation_scorer_then_returns_expected_result():
|
|
pcm, outlier_observation = _create_pcm_and_outlier_observation()
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(pcm, outlier_observation, anomaly_scorer_factory=lambda: MeanDeviationScorer()).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(
|
|
pcm, outlier_observation, anomaly_scorer_factory=lambda: ITAnomalyScorer(MeanDeviationScorer())
|
|
).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
|
|
def test_given_outlier_observation_when_estimate_anomaly_scores_using_median_deviation_scorer_then_returns_expected_result():
|
|
pcm, outlier_observation = _create_pcm_and_outlier_observation()
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(pcm, outlier_observation, anomaly_scorer_factory=lambda: MedianDeviationScorer()).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(
|
|
pcm, outlier_observation, anomaly_scorer_factory=lambda: ITAnomalyScorer(MedianDeviationScorer())
|
|
).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
|
|
def test_given_outlier_observation_when_estimate_anomaly_scores_using_inverse_density_scorer_then_returns_expected_result():
|
|
pcm, outlier_observation = _create_pcm_and_outlier_observation()
|
|
|
|
assert (
|
|
max(
|
|
anomaly_scores(pcm, outlier_observation, anomaly_scorer_factory=lambda: InverseDensityScorer()).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X1"
|
|
)
|
|
|
|
|
|
def test_given_data_with_change_in_mechanism_when_estimate_distribution_change_score_then_returns_expected_result():
|
|
causal_model, original_observations = _create_scm_for_distribution_change()
|
|
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
# Here, changing the mechanism.
|
|
X2 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
|
|
assert (
|
|
max(
|
|
estimate_distribution_change_scores(causal_model, original_observations, outlier_observations).items(),
|
|
key=operator.itemgetter(1),
|
|
)[0]
|
|
== "X2"
|
|
)
|
|
|
|
|
|
def test_given_data_with_change_in_mechanism_when_estimate_distribution_change_score_using_difference_in_means_then_returns_expected_result():
|
|
causal_model, original_observations = _create_scm_for_distribution_change()
|
|
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
# Here, changing the mechanism.
|
|
X2 = 2 + 0.5 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
|
|
scores = estimate_distribution_change_scores(
|
|
causal_model,
|
|
original_observations,
|
|
outlier_observations,
|
|
difference_estimation_func=lambda x, y: abs(np.mean(x) - np.mean(y)),
|
|
)
|
|
|
|
assert scores["X0"] == approx(0, abs=0.1)
|
|
assert scores["X1"] == approx(0, abs=0.1)
|
|
assert scores["X2"] == approx(2, abs=0.1)
|
|
assert scores["X3"] == approx(0, abs=0.1)
|
|
|
|
assert max(scores.items(), key=operator.itemgetter(1))[0] == "X2"
|
|
|
|
|
|
def test_given_data_with_change_in_root_node_when_estimate_distribution_change_score_using_difference_in_means_then_returns_expected_result():
|
|
causal_model, original_observations = _create_scm_for_distribution_change()
|
|
|
|
# Here, changing the mechanism.
|
|
X0 = np.random.uniform(-1.5, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X2 = 0.5 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
|
|
scores = estimate_distribution_change_scores(
|
|
causal_model,
|
|
original_observations,
|
|
outlier_observations,
|
|
difference_estimation_func=lambda x, y: abs(np.mean(x) - np.mean(y)),
|
|
)
|
|
|
|
assert scores["X0"] == approx(0.25, abs=0.1)
|
|
assert scores["X1"] == approx(0, abs=0.1)
|
|
assert scores["X2"] == approx(0, abs=0.1)
|
|
assert scores["X3"] == approx(0, abs=0.1)
|
|
|
|
assert max(scores.items(), key=operator.itemgetter(1))[0] == "X0"
|
|
|
|
|
|
@flaky(max_runs=3)
|
|
def test_given_graph_with_multiple_parents_when_estimate_distribution_change_scores_then_returns_expected_result():
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = np.random.uniform(-1, 1, 1000)
|
|
X2 = X0 + X1
|
|
|
|
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2})
|
|
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = np.random.uniform(-1, 1, 1000)
|
|
X2 = X0 + X1 + 1
|
|
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2})
|
|
|
|
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X0", "X2"), ("X1", "X2")]))
|
|
auto.assign_causal_mechanisms(causal_model, original_observations, auto.AssignmentQuality.GOOD)
|
|
|
|
fit(causal_model, original_observations)
|
|
|
|
scores = estimate_distribution_change_scores(
|
|
causal_model,
|
|
original_observations,
|
|
outlier_observations,
|
|
difference_estimation_func=lambda x, y: abs(np.mean(x) - np.mean(y)),
|
|
)
|
|
|
|
assert scores["X0"] == 0
|
|
assert scores["X1"] == 0
|
|
assert scores["X2"] == approx(1, abs=0.005)
|
|
|
|
|
|
@flaky(max_runs=5)
|
|
def test_given_data_with_change_in_mechanism_when_estimate_distribution_change_score_using_difference_in_variance_then_returns_expected_result():
|
|
causal_model, original_observations = _create_scm_for_distribution_change()
|
|
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
# Here, changing the mechanism.
|
|
X2 = 0.5 * X0 + np.random.normal(0, 2, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
outlier_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
|
|
scores = estimate_distribution_change_scores(
|
|
causal_model,
|
|
original_observations,
|
|
outlier_observations,
|
|
difference_estimation_func=lambda x, y: abs(np.var(x) - np.var(y)),
|
|
)
|
|
|
|
assert scores["X0"] == approx(0, abs=0.1)
|
|
assert scores["X1"] == approx(0, abs=0.1)
|
|
assert scores["X2"] == approx(4, abs=0.5)
|
|
assert scores["X3"] == approx(0, abs=0.1)
|
|
|
|
assert max(scores.items(), key=operator.itemgetter(1))[0] == "X2"
|
|
|
|
|
|
@flaky(max_runs=3)
|
|
def test_given_multivariate_inputs_when_estimate_anomaly_scores_then_does_not_raise_error():
|
|
"""This test verifies that estimate_anomaly_scores correctly handles multivariate input features, which
|
|
caused problems in an earlier version."""
|
|
|
|
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X1", "X0"), ("X2", "X0"), ("X3", "X0"), ("X4", "X0")]))
|
|
|
|
data = np.random.normal(0, 1, (10000, 4))
|
|
data = pd.DataFrame(
|
|
{
|
|
"X0": (data[:, 0] + data[:, 1] + data[:, 2] + data[:, 3]).reshape(-1),
|
|
"X1": data[:, 0].reshape(-1),
|
|
"X2": data[:, 1].reshape(-1),
|
|
"X3": data[:, 2].reshape(-1),
|
|
"X4": data[:, 3].reshape(-1),
|
|
}
|
|
)
|
|
data_anomaly = pd.DataFrame(
|
|
{
|
|
"X0": [13], # X1 + X2 + X3 + X4
|
|
"X1": [10],
|
|
"X2": [1],
|
|
"X3": [1],
|
|
"X4": [1],
|
|
}
|
|
)
|
|
|
|
auto.assign_causal_mechanisms(causal_model, data, auto.AssignmentQuality.GOOD)
|
|
|
|
fit(causal_model, data)
|
|
scores = anomaly_scores(causal_model, data_anomaly)
|
|
assert scores["X1"][0] == approx(-np.log(1 / (data.shape[0] + 1)), abs=3)
|
|
|
|
|
|
@flaky(max_runs=3)
|
|
def test_given_simple_linear_data_when_estimate_conditional_anomaly_scores_then_returns_expected_result():
|
|
X = np.random.normal(0, 1, 1000)
|
|
N = np.random.normal(0, 1, 1000)
|
|
|
|
Y = 2 * X + N
|
|
|
|
causal_model = AdditiveNoiseModel(prediction_model=create_linear_regressor())
|
|
causal_model.fit(X, Y)
|
|
|
|
anomaly_scorer = MeanDeviationScorer()
|
|
anomaly_scorer.fit(N)
|
|
|
|
assert conditional_anomaly_scores(X[:5], Y[:5], causal_model, MeanDeviationScorer).reshape(-1) == approx(
|
|
anomaly_scorer.score(Y[:5] - 2 * X[:5]), abs=0.1
|
|
)
|
|
|
|
|
|
def _create_scm_for_distribution_change():
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X2 = 0.5 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
|
|
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
|
|
causal_model = ProbabilisticCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
|
|
causal_model.set_causal_mechanism("X0", EmpiricalDistribution())
|
|
causal_model.set_causal_mechanism("X1", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
|
|
causal_model.set_causal_mechanism("X2", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
|
|
causal_model.set_causal_mechanism("X3", AdditiveNoiseModel(prediction_model=create_linear_regressor()))
|
|
fit(causal_model, original_observations)
|
|
|
|
return causal_model, original_observations
|
|
|
|
|
|
def _create_pcm_and_outlier_observation():
|
|
X0 = np.random.uniform(-1, 1, 1000)
|
|
X1 = 2 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X2 = 0.5 * X0 + np.random.normal(0, 0.1, 1000)
|
|
X3 = 0.5 * X2 + np.random.normal(0, 0.1, 1000)
|
|
|
|
original_observations = pd.DataFrame({"X0": X0, "X1": X1, "X2": X2, "X3": X3})
|
|
outlier_observation = pd.DataFrame(
|
|
{"X0": X0[:1], "X1": X1[:1] + np.std(X1.data) * 3, "X2": X2[:1], "X3": X3[:1]} # Creating an anomaly here
|
|
)
|
|
|
|
pcm = ProbabilisticCausalModel(nx.DiGraph([("X0", "X1"), ("X0", "X2"), ("X2", "X3")]))
|
|
auto.assign_causal_mechanisms(pcm, original_observations)
|
|
fit(pcm, original_observations)
|
|
|
|
return pcm, outlier_observation
|