Optimize backdoor criterion identification for large graphs (#342)

* Used the d-separation algorithm from networkx
* Updated backdoor identification to be faster
* Refactor code for the new function
* Changed the logic for unobserved confounding to simply remove the unobserved confounder when returning a backdoor set. Simplifies the logic.
This commit is contained in:
Amit Sharma 2022-01-03 09:10:16 +05:30 коммит произвёл GitHub
Родитель 95be035081
Коммит 8452f3948c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 2204 добавлений и 1647 удалений

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -159,8 +159,11 @@ class CausalGraph:
if node_name not in common_cause_names:
for outcome in self.outcome_name:
self._graph.add_node(node_name, observed="yes")
self._graph.add_edge(node_name, outcome, style = "dotted", headport="s", tailport="n")
self._graph.add_edge(outcome, node_name, style = "dotted", headport="n", tailport="s") # TODO make the ports more general so that they apply not just to top-bottom node configurations
# Assuming the simple form of effect modifier
# that directly causes the outcome.
self._graph.add_edge(node_name, outcome)
#self._graph.add_edge(node_name, outcome, style = "dotted", headport="s", tailport="n")
#self._graph.add_edge(outcome, node_name, style = "dotted", headport="n", tailport="s") # TODO make the ports more general so that they apply not just to top-bottom node configurations
if mediator_names is not None:
for node_name in mediator_names:
for treatment, outcome in itertools.product(self.treatment_name, self.outcome_name):
@ -209,6 +212,7 @@ class CausalGraph:
def do_surgery(self, node_names, remove_outgoing_edges=False,
remove_incoming_edges=False):
node_names = parse_state(node_names)
new_graph = self._graph.copy()
for node_name in node_names:
if remove_outgoing_edges:
@ -236,15 +240,39 @@ class CausalGraph:
causes = causes.union(self.get_ancestors(v, new_graph=new_graph))
return causes
def check_valid_backdoor_set(self, nodes1, nodes2, nodes3, backdoor_paths=None):
def check_dseparation(self, nodes1, nodes2, nodes3, new_graph=None,
dseparation_algo="default"):
if dseparation_algo == "default":
if new_graph is None:
new_graph = self._graph
dseparated = nx.algorithms.d_separated(new_graph,
set(nodes1), set(nodes2), set(nodes3))
else:
raise ValueError(f"{dseparation_algo} method for d-separation not supported.")
return dseparated
def check_valid_backdoor_set(self, nodes1, nodes2, nodes3,
backdoor_paths=None, new_graph=None, dseparation_algo="default"):
""" Assume that the first parameter (nodes1) is the treatment,
the second is the outcome, and the third is the candidate backdoor set
"""
# also return the number of backdoor paths blocked by observed nodes
if dseparation_algo == "default":
if new_graph is None:
# Assume that nodes1 is the treatment
new_graph = self.do_surgery(nodes1,
remove_outgoing_edges=True)
dseparated = nx.algorithms.d_separated(new_graph,
set(nodes1), set(nodes2), set(nodes3))
elif dseparation_algo == "naive":
# ignores new_graph parameter, always uses self._graph
if backdoor_paths is None:
backdoor_paths = self.get_backdoor_paths(nodes1, nodes2)
d_separated = all([self.is_blocked(path, nodes3) for path in backdoor_paths])
observed_nodes3 = self.filter_unobserved_variables(nodes3)
num_paths_blocked = sum([self.is_blocked(path, observed_nodes3) for path in backdoor_paths])
return {'is_dseparated': d_separated,
'num_paths_blocked_by_observed_nodes': num_paths_blocked}
dseparated = all([self.is_blocked(path, nodes3) for path in backdoor_paths])
else:
raise ValueError(f"{dseparation_algo} method for d-separation not supported.")
return {'is_dseparated': dseparated}
def get_backdoor_paths(self, nodes1, nodes2):
paths = []
@ -406,8 +434,9 @@ class CausalGraph:
Currently only supports singleton sets.
"""
dpaths = self.get_all_directed_paths(nodes1, nodes2)
return len(dpaths) > 0
#dpaths = self.get_all_directed_paths(nodes1, nodes2)
#return len(dpaths) > 0
return nx.has_path(self._graph, nodes1[0], nodes2[0])
def get_adjacency_matrix(self, *args, **kwargs):
'''
@ -416,14 +445,25 @@ class CausalGraph:
'''
return nx.convert_matrix.to_numpy_matrix(self._graph, *args, **kwargs)
def check_valid_frontdoor_set(self, nodes1, nodes2, candidate_nodes, frontdoor_paths=None):
def check_valid_frontdoor_set(self, nodes1, nodes2, candidate_nodes,
frontdoor_paths=None, new_graph = None,
dseparation_algo="default"):
"""Check if valid the frontdoor variables for set of treatments, nodes1 to set of outcomes, nodes2.
"""
# Condition 1: node 1 ---> node 2 is intercepted by candidate_nodes
if dseparation_algo == "default":
if new_graph is None:
new_graph = self._graph
dseparated = nx.algorithms.d_separated(new_graph,
set(nodes1), set(nodes2), set(candidate_nodes))
elif dseparation_algo == "naive":
if frontdoor_paths is None:
frontdoor_paths = self.get_all_directed_paths(nodes1, nodes2)
d_separated = all([self.is_blocked(path, candidate_nodes) for path in frontdoor_paths])
return d_separated
dseparated = all([self.is_blocked(path, candidate_nodes) for path in frontdoor_paths])
else:
raise ValueError(f"{dseparation_algo} method for d-separation not supported.")
return dseparated
def check_valid_mediation_set(self, nodes1, nodes2, candidate_nodes, mediation_paths=None):
"""Check if candidate nodes are valid mediators for set of treatments, nodes1 to set of outcomes, nodes2.

Просмотреть файл

@ -30,7 +30,7 @@ class CausalIdentifier:
BACKDOOR_MIN="minimal-adjustment"
BACKDOOR_MAX="maximal-adjustment"
METHOD_NAMES = {BACKDOOR_DEFAULT, BACKDOOR_EXHAUSTIVE, BACKDOOR_MIN, BACKDOOR_MAX}
DEFAULT_BACKDOOR_METHOD = BACKDOOR_MAX
DEFAULT_BACKDOOR_METHOD = BACKDOOR_DEFAULT
def __init__(self, graph, estimand_type,
method_name = "default",
@ -249,19 +249,28 @@ class CausalIdentifier:
)
return estimand
def identify_backdoor(self, treatment_name, outcome_name, include_unobserved=True):
def identify_backdoor(self, treatment_name, outcome_name,
include_unobserved=True, dseparation_algo="default"):
backdoor_sets = []
backdoor_paths = None
bdoor_graph = None
if dseparation_algo == "naive":
backdoor_paths = self._graph.get_backdoor_paths(treatment_name, outcome_name)
elif dseparation_algo == "default":
bdoor_graph = self._graph.do_surgery(treatment_name,
remove_outgoing_edges=True)
else:
raise ValueError(f"d-separation algorithm {dseparation_algo} is not supported")
method_name = self.method_name if self.method_name != CausalIdentifier.BACKDOOR_DEFAULT else CausalIdentifier.DEFAULT_BACKDOOR_METHOD
# First, checking if empty set is a valid backdoor set
empty_set = set()
check = self._graph.check_valid_backdoor_set(treatment_name, outcome_name, empty_set,
backdoor_paths=backdoor_paths)
check = self._graph.check_valid_backdoor_set(treatment_name,
outcome_name, empty_set,
backdoor_paths=backdoor_paths, new_graph=bdoor_graph,
dseparation_algo=dseparation_algo)
if check["is_dseparated"]:
backdoor_sets.append({
'backdoor_set':empty_set,
'num_paths_blocked_by_observed_nodes': check["num_paths_blocked_by_observed_nodes"]})
backdoor_sets.append({'backdoor_set':empty_set})
# If the method is `minimal-adjustment`, return the empty set right away.
if method_name == CausalIdentifier.BACKDOOR_MIN:
return backdoor_sets
@ -271,33 +280,75 @@ class CausalIdentifier:
- set(treatment_name) \
- set(outcome_name)
eligible_variables -= self._graph.get_descendants(treatment_name)
num_iterations = 0
found_valid_adjustment_set = False
# If var is d-separated from both treatment or outcome, it cannot
# be a part of the backdoor set
filt_eligible_variables = set()
for var in eligible_variables:
dsep_treat_var = self._graph.check_dseparation(
treatment_name, parse_state(var),
set())
dsep_outcome_var = self._graph.check_dseparation(
outcome_name, parse_state(var), set())
if not dsep_outcome_var or not dsep_treat_var:
filt_eligible_variables.add(var)
if method_name in CausalIdentifier.METHOD_NAMES:
# If `minimal-adjustment` method is specified, start the search from the set with minimum size. Otherwise, start from the largest.
set_sizes = range(1, len(eligible_variables) + 1, 1) if method_name == CausalIdentifier.BACKDOOR_MIN else range(len(eligible_variables), 0, -1)
for size_candidate_set in set_sizes:
for candidate_set in itertools.combinations(eligible_variables, size_candidate_set):
check = self._graph.check_valid_backdoor_set(treatment_name,
outcome_name, candidate_set, backdoor_paths=backdoor_paths)
self.logger.debug("Candidate backdoor set: {0}, is_dseparated: {1}, No. of paths blocked by observed_nodes: {2}".format(candidate_set, check["is_dseparated"], check["num_paths_blocked_by_observed_nodes"]))
if check["is_dseparated"]:
backdoor_sets.append({
'backdoor_set': candidate_set,
'num_paths_blocked_by_observed_nodes': check["num_paths_blocked_by_observed_nodes"]})
found_valid_adjustment_set = True
num_iterations += 1
if method_name == CausalIdentifier.BACKDOOR_EXHAUSTIVE and num_iterations > CausalIdentifier.MAX_BACKDOOR_ITERATIONS:
break
# If the backdoor method is `maximal-adjustment` or `minimal-adjustment`, return the first found adjustment set.
if method_name in {CausalIdentifier.BACKDOOR_MAX, CausalIdentifier.BACKDOOR_MIN} and found_valid_adjustment_set:
break
backdoor_sets, found_valid_adjustment_set = self.find_valid_adjustment_sets(
treatment_name, outcome_name,
backdoor_paths, bdoor_graph,
dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name=method_name,
max_iterations= CausalIdentifier.MAX_BACKDOOR_ITERATIONS)
if method_name == CausalIdentifier.BACKDOOR_DEFAULT and found_valid_adjustment_set:
# repeat the above search with BACKDOOR_MIN
backdoor_sets, _ = self.find_valid_adjustment_sets(
treatment_name, outcome_name,
backdoor_paths, bdoor_graph,
dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name=CausalIdentifier.BACKDOOR_MIN,
max_iterations= CausalIdentifier.MAX_BACKDOOR_ITERATIONS)
else:
raise ValueError(f"Identifier method {method_name} not supported. Try one of the following: {CausalIdentifier.METHOD_NAMES}")
return backdoor_sets
def find_valid_adjustment_sets(self, treatment_name, outcome_name,
backdoor_paths, bdoor_graph, dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name, max_iterations):
num_iterations = 0
found_valid_adjustment_set = False
# If `minimal-adjustment` method is specified, start the search from the set with minimum size. Otherwise, start from the largest.
set_sizes = range(1, len(filt_eligible_variables) + 1, 1) if method_name == CausalIdentifier.BACKDOOR_MIN else range(len(filt_eligible_variables), 0, -1)
for size_candidate_set in set_sizes:
for candidate_set in itertools.combinations(filt_eligible_variables, size_candidate_set):
check = self._graph.check_valid_backdoor_set(treatment_name,
outcome_name, candidate_set,
backdoor_paths=backdoor_paths,
new_graph = bdoor_graph,
dseparation_algo = dseparation_algo)
self.logger.debug("Candidate backdoor set: {0}, is_dseparated: {1}".format(candidate_set, check["is_dseparated"]))
if check["is_dseparated"]:
backdoor_sets.append({'backdoor_set': candidate_set})
found_valid_adjustment_set = True
num_iterations += 1
if method_name == CausalIdentifier.BACKDOOR_EXHAUSTIVE and num_iterations > max_iterations:
self.logger.warning(f"Max number of iterations {max_iterations} reached.")
break
# If the backdoor method is `maximal-adjustment` or `minimal-adjustment`, return the first found adjustment set.
if method_name in {CausalIdentifier.BACKDOOR_DEFAULT, CausalIdentifier.BACKDOOR_MAX, CausalIdentifier.BACKDOOR_MIN} and found_valid_adjustment_set:
break
# If all variables are observed, and the biggest eligible set
# does not satisfy backdoor, then none of its subsets will.
if method_name in {CausalIdentifier.BACKDOOR_DEFAULT, CausalIdentifier.BACKDOOR_MAX} and self._graph.all_observed(filt_eligible_variables):
break
if num_iterations > max_iterations:
self.logger.warning(f"Max number of iterations {max_iterations} reached. Could not find a valid backdoor set.")
break
return backdoor_sets, found_valid_adjustment_set
def get_default_backdoor_set_id(self, backdoor_sets_dict):
# Adding a None estimand if no backdoor set found
if len(backdoor_sets_dict) == 0:
@ -310,12 +361,12 @@ class CausalIdentifier:
min_iv_keys = {key for key, iv_count in iv_count_dict.items() if iv_count == min_iv_count}
min_iv_backdoor_sets_dict = {key: backdoor_sets_dict[key] for key in min_iv_keys}
# Default set is the one with the most number of adjustment variables (optimizing for minimum (unknown) bias not for efficiency)
max_set_length = -1
# Default set is the one with the least number of adjustment variables (optimizing for efficiency)
min_set_length = 1000000
default_key = None
for key, bdoor_set in min_iv_backdoor_sets_dict.items():
if len(bdoor_set) > max_set_length:
max_set_length = len(bdoor_set)
if len(bdoor_set) < min_set_length:
min_set_length = len(bdoor_set)
default_key = key
return default_key
@ -328,11 +379,12 @@ class CausalIdentifier:
proceed_when_unidentifiable = self._proceed_when_unidentifiable
is_identified = [ self._graph.all_observed(bset["backdoor_set"]) for bset in backdoor_sets ]
if all(is_identified):
self.logger.info("All common causes are observed. Causal effect can be identified.")
if any(is_identified):
self.logger.info("Causal effect can be identified.")
backdoor_sets_arr = [list(
bset["backdoor_set"])
for bset in backdoor_sets]
for bset in backdoor_sets
if self._graph.all_observed(bset["backdoor_set"]) ]
else: # there is unobserved confounding
self.logger.warning("If this is observed data (not from a randomized experiment), there might always be missing confounders. Causal effect cannot be identified perfectly.")
response = False # user response
@ -349,11 +401,11 @@ class CausalIdentifier:
self.logger.warn("Identification failed due to unobserved variables.")
backdoor_sets_arr = []
if proceed_when_unidentifiable or response is True:
max_paths_blocked = max( bset['num_paths_blocked_by_observed_nodes'] for bset in backdoor_sets)
backdoor_sets_arr = [list(
self._graph.filter_unobserved_variables(bset["backdoor_set"]))
for bset in backdoor_sets
if bset["num_paths_blocked_by_observed_nodes"]==max_paths_blocked]
# Just removing the unobserved variable
backdoor_sets_arr = []
for bset in backdoor_sets:
curr_set = list(self._graph.filter_unobserved_variables(bset["backdoor_set"]))
backdoor_sets_arr.append(curr_set)
for i in range(len(backdoor_sets_arr)):
backdoor_estimand_expr = self.construct_backdoor_estimand(
@ -364,20 +416,59 @@ class CausalIdentifier:
backdoor_variables_dict["backdoor"+str(i+1)] = backdoor_sets_arr[i]
return estimands_dict, backdoor_variables_dict
def identify_frontdoor(self):
def identify_frontdoor(self, dseparation_algo="default"):
""" Find a valid frontdoor variable if it exists.
Currently only supports a single variable frontdoor set.
"""
frontdoor_var = None
frontdoor_paths = None
fdoor_graph = None
if dseparation_algo == "default":
cond1_graph = self._graph.do_surgery(self.treatment_name,
remove_incoming_edges=True)
bdoor_graph1 = self._graph.do_surgery(self.treatment_name,
remove_outgoing_edges=True)
elif dseparation_algo == "naive":
frontdoor_paths = self._graph.get_all_directed_paths(self.treatment_name, self.outcome_name)
else:
raise ValueError(f"d-separation algorithm {dseparation_algo} is not supported")
eligible_variables = self._graph.get_descendants(self.treatment_name) \
- set(self.outcome_name)
- set(self.outcome_name) \
- set(self._graph.get_descendants(self.outcome_name))
# For simplicity, assuming a one-variable frontdoor set
for candidate_var in eligible_variables:
is_valid_frontdoor = self._graph.check_valid_frontdoor_set(self.treatment_name,
self.outcome_name, parse_state(candidate_var), frontdoor_paths=frontdoor_paths)
self.logger.debug("Candidate frontdoor set: {0}, is_dseparated: {1}".format(candidate_var, is_valid_frontdoor))
# Cond 1: All directed paths intercepted by candidate_var
cond1 = self._graph.check_valid_frontdoor_set(
self.treatment_name, self.outcome_name,
parse_state(candidate_var),
frontdoor_paths=frontdoor_paths,
new_graph=cond1_graph,
dseparation_algo=dseparation_algo)
self.logger.debug("Candidate frontdoor set: {0}, is_dseparated: {1}".format(candidate_var, cond1))
if not cond1:
continue
# Cond 2: No confounding between treatment and candidate var
cond2 = self._graph.check_valid_backdoor_set(
self.treatment_name, parse_state(candidate_var),
set(),
backdoor_paths=None,
new_graph= bdoor_graph1,
dseparation_algo=dseparation_algo)
if not cond2:
continue
# Cond 3: treatment blocks all confounding between candidate_var and outcome
bdoor_graph2 = self._graph.do_surgery(candidate_var,
remove_outgoing_edges=True)
cond3 = self._graph.check_valid_backdoor_set(
parse_state(candidate_var), self.outcome_name,
self.treatment_name,
backdoor_paths=None,
new_graph= bdoor_graph2,
dseparation_algo=dseparation_algo)
is_valid_frontdoor = cond1 and cond2 and cond3
if is_valid_frontdoor:
frontdoor_var = candidate_var
break

Просмотреть файл

@ -8,6 +8,7 @@ class IdentificationTestGraphSolution(object):
def __init__(self, graph_str, observed_variables, biased_sets, minimal_adjustment_sets, maximal_adjustment_sets):
self.graph = CausalGraph("X", "Y", graph_str, observed_node_names=observed_variables)
self.graph_str = graph_str
self.observed_variables = observed_variables
self.biased_sets = biased_sets
self.minimal_adjustment_sets = minimal_adjustment_sets

Просмотреть файл

@ -73,7 +73,7 @@ class TestBackdoorIdentification(object):
set(backdoor_result_dict["backdoor_set"])
for backdoor_result_dict in backdoor_results
]
print(backdoor_sets, expected_sets, example_graph_solution.graph_str)
assert (
((len(backdoor_sets) == 0) and (len(expected_sets) == 0)) # No adjustments exist and that's expected.
or

Просмотреть файл

@ -30,12 +30,15 @@ class TestOptimizeBackdoorIdentifier(object):
# Obtain backdoor sets
path = Backdoor(identifier._graph._graph, treatment_name, outcome_name)
backdoor_sets = path.get_backdoor_vars()
print(backdoor_sets)
# Check if backdoor sets are valid i.e. if they block all paths between the treatment and the outcome
backdoor_paths = identifier._graph.get_backdoor_paths(treatment_name, outcome_name)
check_set = set(backdoor_sets[0]['backdoor_set'])
check = identifier._graph.check_valid_backdoor_set(treatment_name, outcome_name, check_set, backdoor_paths=backdoor_paths)
check = identifier._graph.check_valid_backdoor_set(
treatment_name, outcome_name, check_set,
backdoor_paths=backdoor_paths,
dseparation_algo="naive")
print(check)
assert check["is_dseparated"]
def test_2(self):