зеркало из https://github.com/microsoft/reconner.git
228 строки
7.3 KiB
Python
228 строки
7.3 KiB
Python
"""Make corrections to your data."""
|
|
|
|
import copy
|
|
from collections import defaultdict
|
|
from typing import Any, DefaultDict, Dict, List, cast
|
|
|
|
import spacy
|
|
from spacy.tokens import Doc as SpacyDoc, Span as SpacySpan
|
|
from wasabi import msg
|
|
|
|
from .operations import operation
|
|
from .preprocess import SpacyPreProcessor
|
|
from .types import Correction, Example, Span, Token, TransformationCallbacks
|
|
|
|
|
|
@operation("recon.v1.rename_labels")
|
|
def rename_labels(example: Example, label_map: Dict[str, str]) -> Example:
|
|
"""Rename labels in a copy of List[Example] data
|
|
|
|
Args:
|
|
example (Example): Input Example
|
|
label_map (Dict[str, str]): One-to-one mapping of label names
|
|
|
|
Returns:
|
|
Example: Copy of Example with renamed labels
|
|
"""
|
|
for span in example.spans:
|
|
span.label = label_map.get(span.label, span.label)
|
|
return example
|
|
|
|
|
|
@operation("recon.v1.fix_annotations")
|
|
def fix_annotations(
|
|
example: Example,
|
|
corrections: List[Correction],
|
|
case_sensitive: bool = False,
|
|
dryrun: bool = False,
|
|
) -> Example:
|
|
"""Fix annotations in a copy of List[Example] data.
|
|
|
|
This function will NOT add annotations to your data.
|
|
It will only remove erroneous annotations and fix the
|
|
labels for specific spans.
|
|
|
|
Args:
|
|
example (Example): Input Example
|
|
corrections (Dict[str, str]): Dictionary of corrections mapping entity text to a new label.
|
|
If the value is set to None, the annotation will be removed
|
|
case_sensitive (bool, optional): Consider case of text for each correction
|
|
dryrun (bool, optional): Treat corrections as a dryrun and just print all changes to be made
|
|
|
|
Returns:
|
|
Example: Example with fixed annotations
|
|
"""
|
|
|
|
if not case_sensitive:
|
|
for c in corrections:
|
|
c.annotation = c.annotation.lower()
|
|
|
|
corrections_map: Dict[str, Correction] = {c.annotation: c for c in corrections}
|
|
prints: List[str] = []
|
|
|
|
ents_to_remove: List[int] = []
|
|
for i, s in enumerate(example.spans):
|
|
t = s.text if case_sensitive else s.text.lower()
|
|
|
|
if t in corrections_map:
|
|
c = corrections_map[t]
|
|
if c.to_label is None and s.label in c.from_labels:
|
|
if dryrun:
|
|
prints.append(f"Deleting span: {s.text}")
|
|
else:
|
|
ents_to_remove.append(i)
|
|
elif s.label in c.from_labels or "ANY" in c.from_labels:
|
|
if dryrun:
|
|
prints.append(
|
|
f"Correction span: {s.text} from labels: {c.from_labels} to label: {c.to_label}"
|
|
)
|
|
else:
|
|
s.label = cast(str, c.to_label)
|
|
|
|
i = len(ents_to_remove) - 1
|
|
while i >= 0:
|
|
idx = ents_to_remove[i]
|
|
del example.spans[idx]
|
|
i -= 1
|
|
|
|
if dryrun:
|
|
msg.divider("Example Text")
|
|
msg.text(example.text)
|
|
for line in prints:
|
|
msg.text(line)
|
|
|
|
return example
|
|
|
|
|
|
def corrections_from_dict(corrections_dict: Dict[str, Any]) -> List[Correction]:
|
|
"""Create a list of Correction objects from a simpler config for
|
|
corrections using a Dict representation mapping keys to either the label to
|
|
convert to or a tuple of (from_label, to_label) pairings or (List[from_labels], to_label)
|
|
pairings if you want to convert as subset of labels at a time
|
|
|
|
Args:
|
|
corrections_dict (Dict[str, Any]): Corrections formatted dict
|
|
e.g. {
|
|
"united states": "GPE",
|
|
"London": (["LOC"], "GPE")
|
|
}
|
|
|
|
Raises:
|
|
ValueError: If the format of the dict
|
|
|
|
Returns:
|
|
[type]: [description]
|
|
"""
|
|
corrections: List[Correction] = []
|
|
for key, val in corrections_dict.items():
|
|
if isinstance(val, str) or val == None:
|
|
from_labels = ["ANY"]
|
|
to_label = val
|
|
elif isinstance(val, tuple):
|
|
if isinstance(val[0], str):
|
|
from_labels = [val[0]]
|
|
else:
|
|
from_labels = val[0]
|
|
to_label = val[1]
|
|
else:
|
|
raise ValueError(
|
|
"Cannot parse corrections dict. Value must be either a str of the label "
|
|
+ "to change the annotation to (TO_LABEL) or a tuple of (FROM_LABEL, TO_LABEL)"
|
|
)
|
|
corrections.append(Correction(annotation=key, from_labels=from_labels, to_label=to_label))
|
|
return corrections
|
|
|
|
|
|
@operation("recon.v1.strip_annotations")
|
|
def strip_annotations(
|
|
example: Example, strip_chars: List[str] = [".", "!", "?", "-", ":", " "]
|
|
) -> Example:
|
|
"""Strip punctuation and spaces from start and end of annotations.
|
|
These characters are almost always a mistake and will confuse a model
|
|
|
|
Args:
|
|
example (Example): Input Example
|
|
strip_chars (List[str], optional): Characters to strip.
|
|
|
|
Returns:
|
|
Example: Example with stripped spans
|
|
"""
|
|
|
|
for s in example.spans:
|
|
for ch in strip_chars:
|
|
if s.text.startswith(ch):
|
|
ch = s.text[0]
|
|
|
|
while ch in strip_chars:
|
|
s.text = s.text[1:]
|
|
s.start += 1
|
|
ch = s.text[0]
|
|
elif s.text.endswith(ch):
|
|
ch = s.text[-1]
|
|
while ch in strip_chars:
|
|
s.text = s.text[:-1]
|
|
ch = s.text[-1]
|
|
s.end -= 1
|
|
return example
|
|
|
|
|
|
nlp = spacy.blank("en")
|
|
nlp.add_pipe(nlp.create_pipe("sentencizer"))
|
|
spacy_pre = SpacyPreProcessor(nlp)
|
|
|
|
|
|
@operation("recon.v1.split_sentences", pre=[spacy_pre])
|
|
def split_sentences(example: Example, preprocessed_outputs: Dict[str, Any] = {}) -> List[Example]:
|
|
"""Split a single example into multiple examples by splitting the text into
|
|
multiple sentences and resetting entity and token offsets based on offsets
|
|
relative to sentence boundaries
|
|
|
|
Args:
|
|
example (Example): Input Example
|
|
preprocessed_outputs (Dict[str, Any], optional): Outputs of preprocessors.
|
|
|
|
Returns:
|
|
List[Example]: List of split examples.
|
|
Could be list of 1 if the example is just one sentence.
|
|
"""
|
|
doc = preprocessed_outputs["recon.v1.spacy"]
|
|
|
|
new_examples = []
|
|
ents = []
|
|
for ent in example.spans:
|
|
span = doc.char_span(ent.start, ent.end, label=ent.label)
|
|
if not span:
|
|
token = None
|
|
text = doc.text[ent.start : ent.end]
|
|
for t in doc:
|
|
if t.text == text:
|
|
token = t
|
|
if token:
|
|
span = SpacySpan(doc, token.i, token.i + 1, label=ent.label)
|
|
ents.append(span)
|
|
|
|
doc.ents = ents
|
|
|
|
for sent in doc.sents:
|
|
sent_doc = sent.as_doc()
|
|
new_example = Example(
|
|
text=sent_doc.text,
|
|
spans=[
|
|
Span(
|
|
text=e.text,
|
|
start=e.start_char,
|
|
end=e.end_char,
|
|
token_start=e.start,
|
|
token_end=e.end,
|
|
label=e.label_,
|
|
)
|
|
for e in sent_doc.ents
|
|
],
|
|
tokens=[
|
|
Token(text=t.text, start=t.idx, end=t.idx + len(t.text), id=i)
|
|
for i, t in enumerate(sent_doc)
|
|
],
|
|
)
|
|
new_examples.append(new_example)
|
|
return new_examples
|