CameraTraps/archive/taxonomy_mapping/taxonomy_graph.py

321 строка
12 KiB
Python

r"""
Methods for transforming taxonomy CSV into a graph structure backed by
NetworkX.
We treat each taxon in the taxonomy as a node in a graph, represented by the
TaxonNode class. We use a NetworkX directed graph (nx.DiGraph) to keep track of
the edges (parent-child relationships) between the nodes.
In theory, the true biological taxonomy graph should be a tree, where every
taxon node has exactly 1 parent. However, because we use both GBIF and INAT
taxonomies, there are 2 situations where a taxon node ends up with two parents.
Thus, the graph is actually a "directed acyclic graph" (DAG) instead of a tree.
The two situations are explained in detail below. This module includes a
function dag_to_tree() which converts a DAG to a tree by heuristically removing
edges from the DAG so that each node only has 1 parent.
CASE 1: INAT and GBIF have different granularity in their taxonomy levels
======
An example is shown below. In dag_to_tree(), the lower parent is kept, while
the higher-up parent is discarded. In this example, the "sciurini -> sciurus"
edge would be kept, while "sciuridae -> sciurus" would be removed.
"eastern gray squirrel" (inat) "squirrel" (gbif)
------------------------------ -----------------
family: sciuridae
/ \
subfamily: sciurinae | # skips subfamily
| |
tribe: sciurini | # skips tribe
\ /
genus: sciurus
CASE 2: INAT and GBIF have different taxonomies
======
An example is shown below. In dag_to_tree(), the resolution to these
discrepancies are hard-coded.
order: cathartiformes (inat) accipitriformes (gbif)
\ /
family: cathartidae
"""
# allow forward references in typing annotations
from __future__ import annotations
from typing import (ClassVar, Container, Dict, Iterable, List, Optional, Set,
Tuple)
import networkx as nx
import pandas as pd
default_source = 'inat'
class TaxonNode:
"""
A node in a taxonomy graph (DAG), associated with a set of dataset labels.
By default, we support multiple parents for each TaxonNode. See discussion
in module docstring above.
"""
# class variables
single_parent_only: ClassVar[bool] = False
# instance variables
level: str
name: str
ids: Set[Tuple[str, int]]
graph: Optional[nx.DiGraph]
dataset_labels: Set[Tuple[str, str]]
def __init__(self, level: str, name: str,
graph: Optional[nx.DiGraph] = None):
"""Initializes a TaxonNode."""
self.level = level
self.name = name
self.graph = graph
self.ids = set()
self.dataset_labels = set()
def __repr__(self):
id_str = ', '.join(f'{source}={id}' for source, id in self.ids)
return f'TaxonNode({id_str}, level={self.level}, name={self.name})'
@property # read-only getter
def parents(self) -> List[TaxonNode]:
assert self.graph is not None
return list(self.graph.predecessors(self))
@parents.setter
def parents(self, parents: Iterable[TaxonNode]) -> None:
assert self.graph is not None
for p in self.parents:
self.graph.remove_edge(p, self)
for p in parents:
self.graph.add_edge(p, self)
@property # read-only getter
def children(self) -> List[TaxonNode]:
assert self.graph is not None
return list(self.graph.successors(self))
@children.setter
def children(self, children: Iterable[TaxonNode]) -> None:
assert self.graph is not None
for c in self.children:
self.graph.remove_edge(self, c)
for c in children:
self.graph.add_edge(self, c)
def add_id(self, source: str, taxon_id: int) -> None:
assert source in ['gbif', 'inat', 'manual']
self.ids.add((source, taxon_id))
def add_parent(self, parent: TaxonNode) -> None:
"""Adds a TaxonNode to the list of parents of the current TaxonNode.
Requires this TaxonNode to be associated with a Graph.
Args:
parent: TaxonNode, must be higher in the taxonomical hierarchy
"""
assert self.graph is not None
parents = self.parents
if TaxonNode.single_parent_only and len(parents) > 0:
assert len(parents) == 1
assert parents[0] is parent, (
f'self.parents: {parents}, new parent: {parent}')
return
if parent not in parents:
self.graph.add_edge(parent, self)
def add_child(self, child: TaxonNode) -> None:
"""Adds a TaxonNode to the list of children of the current TaxonNode.
Requires this TaxonNode to be associated with a Graph.
Args:
child: TaxonNode, must be lower in the taxonomical hierarchy
"""
assert self.graph is not None
self.graph.add_edge(self, child)
def add_dataset_label(self, ds: str, ds_label: str) -> None:
"""
Args:
ds: str, name of dataset
ds_label: str, name of label used by that dataset
"""
self.dataset_labels.add((ds, ds_label))
def get_dataset_labels(self,
include_datasets: Optional[Container[str]] = None
) -> Set[Tuple[str, str]]:
"""Returns a set of all (ds, ds_label) tuples that belong to this taxon
node or its descendants.
Args:
include_datasets: list of str, names of datasets to include
if None, then all datasets are included
Returns: set of (ds, ds_label) tuples
"""
result = self.dataset_labels
if include_datasets is not None:
result = set(tup for tup in result if tup[0] in include_datasets)
for child in self.children:
result |= child.get_dataset_labels(include_datasets)
return result
@classmethod
def lowest_common_ancestor(cls, nodes: Iterable[TaxonNode]
) -> Optional[TaxonNode]:
"""Returns the lowest common ancestor (LCA) of a list or set of nodes.
For each node in <nodes>, get the set of nodes on the path to the root.
The LCA of <nodes> is certainly in the intersection of these sets.
Iterate through the nodes in this set intersection, looking for a node
such that none of its children is in this intersection. Given n nodes
from a k-ary tree of height h, the algorithm runs in O((n + k)h).
Returns: TaxonNode, the LCA if it exists, or None if no LCA exists
"""
paths = []
for node in nodes:
# get path to root
path = {node}
remaining = node.parents.copy() # make a shallow copy
while len(remaining) > 0:
x = remaining.pop()
if x not in path:
path.add(x)
remaining += x.parents
paths.append(path)
intersect = set.intersection(*paths)
for node in intersect:
if intersect.isdisjoint(node.children):
return node
return None
def build_taxonomy_graph(taxonomy_df: pd.DataFrame
) -> Tuple[
nx.DiGraph,
Dict[Tuple[str, str], TaxonNode],
Dict[Tuple[str, str], TaxonNode]
]:
"""Creates a mapping from (taxon_level, taxon_name) to TaxonNodes, used for
gathering all dataset labels associated with a given taxon.
Args:
taxonomy_df: pd.DataFrame, the taxonomy CSV
Returns:
graph: nx.DiGraph
taxon_to_node: dict, maps (taxon_level, taxon_name) to a TaxonNode,
keys are all lowercase
label_to_node: dict, maps (dataset_name, dataset_label) to the lowest
TaxonNode node in the tree that contains the label,
keys are all lowercase
"""
graph = nx.DiGraph()
taxon_to_node = {} # maps (taxon_level, taxon_name) to a TaxonNode
label_to_node = {} # maps (dataset_name, dataset_label) to a TaxonNode
for _, row in taxonomy_df.iterrows():
ds = row['dataset_name'].lower()
ds_label = row['query'].lower()
if 'source' in row:
id_source = row['source']
else:
id_source = default_source
taxa_ancestry = row['taxonomy_string']
if pd.isna(taxa_ancestry):
# taxonomy CSV rows without 'taxonomy_string' entries are excluded
# from the taxonomy graph, but can be included in a classification
# label specification JSON via the 'dataset_labels' key
continue
else:
taxa_ancestry = eval(taxa_ancestry) # pylint: disable=eval-used
taxon_child: Optional[TaxonNode] = None
for i, taxon in enumerate(taxa_ancestry):
taxon_id, taxon_level, taxon_name, _ = taxon
taxon_level = taxon_level.lower()
taxon_name = taxon_name.lower()
key = (taxon_level, taxon_name)
if key not in taxon_to_node:
taxon_to_node[key] = TaxonNode(level=taxon_level,
name=taxon_name, graph=graph)
node = taxon_to_node[key]
if taxon_child is not None:
node.add_child(taxon_child)
node.add_id(id_source, int(taxon_id)) # np.int64 -> int
if i == 0:
assert row['taxonomy_level'] == taxon_level, (
f'taxonomy CSV level: {row["taxonomy_level"]}, '
f'level from taxonomy_string: {taxon_level}')
assert row['scientific_name'] == taxon_name
node.add_dataset_label(ds, ds_label)
label_to_node[(ds, ds_label)] = node
taxon_child = node
assert nx.is_directed_acyclic_graph(graph)
return graph, taxon_to_node, label_to_node
def dag_to_tree(graph: nx.DiGraph,
taxon_to_node: Dict[Tuple[str, str], TaxonNode]) -> nx.DiGraph:
"""Converts the taxonomy graph from a DAG to a tree. See module docstring
for more information.
NOTE: nx.is_tree() on the output of this function might fail because the
tree may have disconnected components. Instead, check nx.is_tree() on each
component separately.
Args:
graph: nx.DiGraph, DAG representation of taxonomy hieararchy
taxon_to_node: dict, maps (taxon_level, taxon_name) to a TaxonNode
Returns: nx.DiGraph, a tree-structured graph
"""
tree = nx.DiGraph()
for node in graph.nodes:
tree.add_node(node)
if len(node.parents) == 1:
tree.add_edge(node.parents[0], node)
elif len(node.parents) == 2:
p0 = node.parents[0]
p1 = node.parents[1]
# use the lower parent
if p1 in nx.descendants(graph, p0):
tree.add_edge(p1, node)
elif p0 in nx.descendants(graph, p1):
tree.add_edge(p0, node)
else:
# special cases
if node.name == 'cathartidae':
p = taxon_to_node[('order', 'accipitriformes')]
elif node.name == 'soricidae':
p = taxon_to_node[('order', 'eulipotyphla')]
elif node.name == 'nyctanassa violacea':
p = taxon_to_node[('genus', 'nyctanassa')]
elif node.name == 'trochilidae': # this one is controversial
p = taxon_to_node[('order', 'caprimulgiformes')]
else:
assert False
assert (p is p0) or (p is p1)
tree.add_edge(p, node)
for node in tree.nodes:
node.graph = tree
return tree