diff --git a/spacy/pipeline/__init__.py b/spacy/pipeline/__init__.py index 2c4a5a8a87f..02c900310b5 100644 --- a/spacy/pipeline/__init__.py +++ b/spacy/pipeline/__init__.py @@ -1,4 +1,5 @@ from .attributeruler import AttributeRuler +from .coordinationruler import CoordinationSplitter from .dep_parser import DependencyParser from .edit_tree_lemmatizer import EditTreeLemmatizer from .entity_linker import EntityLinker @@ -21,6 +22,7 @@ __all__ = [ "AttributeRuler", + "CoordinationSplitter", "DependencyParser", "EditTreeLemmatizer", "EntityLinker", diff --git a/spacy/pipeline/coordinationruler.py b/spacy/pipeline/coordinationruler.py new file mode 100644 index 00000000000..31ae729c5a3 --- /dev/null +++ b/spacy/pipeline/coordinationruler.py @@ -0,0 +1,248 @@ +import re +from typing import Callable, List, Optional, Union + +import pydantic +from pydantic import BaseModel + +if pydantic.VERSION.split(".")[0] == "1": # type: ignore + from pydantic import validator # type: ignore +else: + from pydantic import field_validator as validator # type: ignore + +from ..language import Language +from ..tokens import Doc, Token +from ..vocab import Vocab +from .pipe import Pipe + +######### helper functions across the default splitting rules ############## + + +def _split_doc(doc: Doc) -> bool: + """Check to see if the document has a noun phrase + with a modifier and a conjunction. + + Args: + doc (Doc): The input document. + + Returns: + bool: True if the document has a noun phrase + with a modifier and a conjunction, else False. + """ + + noun_modified = False + has_conjunction = False + + for token in doc: + if token.head.pos_ == "NOUN": ## check to see that the phrase is a noun phrase + for child in token.head.children: + if child.dep_ in ["amod", "advmod", "nmod"]: + noun_modified = True + + # check if there is a conjunction in the phrase + if token.pos_ == "CCONJ": + has_conjunction = True + + if noun_modified and has_conjunction: + return True + + else: + return False + + +def _collect_modifiers(token: Token) -> List[str]: + """Collects adverbial modifiers for a given token. + + Args: + token (Token): The input token. + + Returns: + List[str]: A list of modifiers for the token. + """ + modifiers = [] + for child in token.children: + if child.dep_ == "amod": + # collect adverbial modifiers for this adjective + adv_mods = [ + adv_mod.text + for adv_mod in child.children + if adv_mod.dep_ in ["advmod"] and not adv_mod.pos_ == "CCONJ" + ] + + modifier_phrase = " ".join(adv_mods + [child.text]) + modifiers.append(modifier_phrase) + # also check for conjunctions to this adjective + for conj in child.conjuncts: + adv_mods_conj = [ + adv_mod.text + for adv_mod in conj.children + if adv_mod.dep_ in ["advmod"] and not adv_mod.pos_ == "CCONJ" + ] + modifier_phrase_conj = " ".join(adv_mods_conj + [conj.text]) + modifiers.append(modifier_phrase_conj) + + return modifiers + + +########### DEFAULT COORDINATION SPLITTING RULES ############## + + +def split_noun_coordination(doc: Doc) -> Union[List[str], None]: + """Identifies and splits noun phrases with a modifier + and a conjunction. + + construction cases: + - "apples and oranges" -> None + - "green apples and oranges" -> ["green apples", "green oranges"] + - "apples and juicy oranges" -> ["juicy apples", "juicy oranges"] + - "hot chicken wings and soup" -> ["hot chicken wings", "hot soup"] + - "green apples and rotten oranges" -> ["green apples", "rotten oranges"] + - "very green apples and oranges" -> ["very green apples", "very green oranges"] + - "delicious and juicy apples" -> ["delicious apples", "juicy apples"] + - "delicious but quite sour apples" -> ["delicious apples", "quite sour apples"] + - "delicious but quite sour apples and oranges" -> ["delicious apples", "quite sour apples", "delicious oranges", "quite sour oranges"] + + Args: + doc (Doc): The input document. + + Returns: + Union[List[str], None]: A list of the coordinated noun phrases, + or None if no coordinated noun phrases are found. + """ + phrases = [] + modified_nouns = set() + to_split = _split_doc(doc) + + if to_split: + for token in doc: + if token.dep_ == "amod" and token.head.pos_ == "NOUN": + head_noun = token.head + + if head_noun not in modified_nouns: + modifier_phrases = _collect_modifiers(head_noun) + nouns_to_modify = [head_noun] + list(head_noun.conjuncts) + + for noun in nouns_to_modify: + compound_parts = [ + child.text + for child in noun.lefts + if child.dep_ == "compound" + ] + complete_noun_phrase = " ".join(compound_parts + [noun.text]) + for modifier_phrase in modifier_phrases: + phrases.append(f"{modifier_phrase} {complete_noun_phrase}") + modified_nouns.add(noun) # mark this noun as modified + + return phrases if phrases != [] else None + else: + return None + + +############################################################### + + +class SplittingRule(BaseModel): + function: Callable[[Doc], Union[List[str], None]] + + @validator("function") + def check_return_type(cls, v): + dummy_doc = Doc(Language().vocab, words=["dummy", "doc"], spaces=[True, False]) + result = v(dummy_doc) + if result is not None: + if not isinstance(result, List): + raise ValueError( + "The custom splitting rule must return None or a list." + ) + elif not all(isinstance(item, str) for item in result): + raise ValueError( + "The custom splitting rule must return None or a list of strings." + ) + return v + + +@Language.factory( + "coordination_splitter", requires=["token.dep", "token.tag", "token.pos"] +) +def make_coordination_splitter(nlp: Language, name: str): + """Make a CoordinationSplitter component. + + the default splitting rules include: + - split_noun_coordination + + Args: + nlp (Language): The spaCy Language object. + name (str): The name of the component. + + RETURNS The CoordinationSplitter component. + + DOCS: xxx + """ + + return CoordinationSplitter(nlp.vocab, name=name) + + +class CoordinationSplitter(Pipe): + def __init__( + self, + vocab: Vocab, + name: str = "coordination_splitter", + rules: Optional[List[SplittingRule]] = None, + ) -> None: + self.name = name + self.vocab = vocab + if rules is None: + default_rules = [ + split_noun_coordination, + ] + self.rules = [SplittingRule(function=rule) for rule in default_rules] + else: + self.rules = [ + rule + if isinstance(rule, SplittingRule) + else SplittingRule(function=rule) + for rule in rules + ] + + def clear_rules(self) -> None: + """Clear the default splitting rules.""" + self.rules = [] + + def add_default_rules(self) -> None: + """Reset the default splitting rules.""" + default_rules = [ + split_noun_coordination, + ] + self.rules = [SplittingRule(function=rule) for rule in default_rules] + + def add_rule(self, rule: Callable[[Doc], Union[List[str], None]]) -> None: + """Add a single splitting rule to the default rules.""" + validated_rule = SplittingRule(function=rule) + self.rules.append(validated_rule) + + def add_rules(self, rules: List[Callable[[Doc], Union[List[str], None]]]) -> None: + """Add a list of splitting rules to the default rules. + + Args: + rules (List[Callable[[Doc], Union[List[str], None]]]): A list of functions to be added as splitting rules. + """ + for rule in rules: + # Wrap each rule in a SplittingRule instance to ensure it's validated + validated_rule = SplittingRule(function=rule) + self.rules.append(validated_rule) + + def __call__(self, doc: Doc) -> Doc: + """Apply the splitting rules to the doc. + + Args: + doc (Doc): The spaCy Doc object. + + Returns: + Doc: The modified spaCy Doc object. + """ + if doc.lang_ != "en": + return doc + + for rule in self.rules: + split = rule.function(doc) + if split: + return Doc(doc.vocab, words=split) + return doc diff --git a/spacy/tests/pipeline/test_coordinationruler.py b/spacy/tests/pipeline/test_coordinationruler.py new file mode 100644 index 00000000000..38bfc19e59a --- /dev/null +++ b/spacy/tests/pipeline/test_coordinationruler.py @@ -0,0 +1,404 @@ +from typing import List + +import pytest + +import spacy +from spacy.pipeline.coordinationruler import split_noun_coordination +from spacy.tokens import Doc + + +@pytest.fixture +def nlp(): + return spacy.blank("en") + + +### CONSTRUCTION CASES ### +@pytest.fixture +def noun_construction_case1(nlp): + words = ["apples", "and", "oranges"] + spaces = [True, True, False] + pos_tags = ["NOUN", "CCONJ", "NOUN"] + dep_relations = ["nsubj", "cc", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[1].head = doc[2] + doc[2].head = doc[0] + doc[0].head = doc[0] + + return doc + + +@pytest.fixture +def noun_construction_case2(nlp): + words = ["red", "apples", "and", "oranges"] + spaces = [True, True, True, False] + pos_tags = ["ADJ", "NOUN", "CCONJ", "NOUN"] + dep_relations = ["amod", "nsubj", "cc", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[1] + doc[2].head = doc[3] + doc[3].head = doc[1] + + return doc + + +@pytest.fixture +def noun_construction_case3(nlp): + words = ["apples", "and", "juicy", "oranges"] + spaces = [True, True, True, False] + pos_tags = ["NOUN", "CCONJ", "ADJ", "NOUN"] + dep_relations = ["nsubj", "cc", "amod", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[0] + doc[1].head = doc[3] + doc[2].head = doc[3] + doc[3].head = doc[0] + + return doc + + +@pytest.fixture +def noun_construction_case4(nlp): + words = ["hot", "chicken", "wings", "and", "soup"] + spaces = [True, True, True, True, False] + pos_tags = ["ADJ", "NOUN", "NOUN", "CCONJ", "NOUN"] + dep_relations = ["amod", "compound", "ROOT", "cc", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[2] + doc[1].head = doc[2] + doc[2].head = doc[2] + doc[3].head = doc[4] + doc[4].head = doc[2] + + return doc + + +@pytest.fixture +def noun_construction_case5(nlp): + words = ["green", "apples", "and", "rotten", "oranges"] + spaces = [True, True, True, True, False] + pos_tags = ["ADJ", "NOUN", "CCONJ", "ADJ", "NOUN"] + dep_relations = ["amod", "ROOT", "cc", "amod", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[1] + doc[1].head = doc[1] + doc[2].head = doc[4] + doc[3].head = doc[4] + doc[4].head = doc[1] + + return doc + + +@pytest.fixture +def noun_construction_case6(nlp): + words = ["very", "green", "apples", "and", "oranges"] + spaces = [True, True, True, True, False] + pos_tags = ["ADV", "ADJ", "NOUN", "CCONJ", "NOUN"] + dep_relations = ["advmod", "amod", "ROOT", "cc", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[1] + doc[1].head = doc[2] + doc[2].head = doc[2] + doc[3].head = doc[4] + doc[4].head = doc[2] + + return doc + + +@pytest.fixture +def noun_construction_case7(nlp): + words = ["fresh", "and", "juicy", "apples"] + spaces = [True, True, True, False] + pos_tags = ["ADJ", "CCONJ", "ADJ", "NOUN"] + dep_relations = ["amod", "cc", "conj", "ROOT"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[3] + doc[1].head = doc[2] + doc[2].head = doc[0] + doc[3].head = doc[3] + + return doc + + +@pytest.fixture +def noun_construction_case8(nlp): + words = ["fresh", ",", "juicy", "and", "delicious", "apples"] + spaces = [True, True, True, True, True, False] + pos_tags = ["ADJ", "PUNCT", "ADJ", "CCONJ", "ADJ", "NOUN"] + dep_relations = ["amod", "punct", "conj", "cc", "conj", "ROOT"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[5] + doc[1].head = doc[2] + doc[2].head = doc[0] + doc[3].head = doc[4] + doc[4].head = doc[0] + doc[5].head = doc[5] + + return doc + + +@pytest.fixture +def noun_construction_case9(nlp): + words = ["fresh", "and", "quite", "sour", "apples"] + spaces = [True, True, True, True, False] + pos_tags = ["ADJ", "CCONJ", "ADV", "ADJ", "NOUN"] + dep_relations = ["amod", "cc", "advmod", "conj", "ROOT"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[4] + doc[1].head = doc[3] + doc[2].head = doc[3] + doc[3].head = doc[0] + doc[4].head = doc[4] + + return doc + + +@pytest.fixture +def noun_construction_case10(nlp): + words = ["fresh", "but", "quite", "sour", "apples", "and", "chicken", "wings"] + spaces = [True, True, True, True, True, True, True, False] + pos_tags = ["ADJ", "CCONJ", "ADV", "ADJ", "NOUN", "CCONJ", "NOUN", "NOUN"] + dep_relations = ["amod", "cc", "advmod", "amod", "ROOT", "cc", "compound", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[4] + doc[1].head = doc[4] + doc[2].head = doc[3] + doc[3].head = doc[4] + doc[5].head = doc[4] + doc[6].head = doc[7] + doc[7].head = doc[4] + + return doc + + +@pytest.fixture +def noun_construction_case11(nlp): + words = ["water", "and", "power", "meters", "and", "electrical", "sockets"] + spaces = [True, True, True, True, True, True, False] + pos_tags = ["NOUN", "CCONJ", "NOUN", "NOUN", "CCONJ", "ADJ", "NOUN"] + dep_relations = ["compound", "cc", "compound", "ROOT", "cc", "amod", "conj"] + + doc = Doc(nlp.vocab, words=words, spaces=spaces) + + for token, pos, dep in zip(doc, pos_tags, dep_relations): + token.pos_ = pos + token.dep_ = dep + + doc[0].head = doc[2] + doc[1].head = doc[2] + doc[2].head = doc[3] + doc[3].head = doc[3] + doc[4].head = doc[6] + doc[5].head = doc[6] + doc[6].head = doc[3] + + return doc + + +### splitting rules ### +def _my_custom_splitting_rule(doc: Doc) -> List[str]: + split_phrases = [] + for token in doc: + if token.text == "red": + split_phrases.append("test1") + split_phrases.append("test2") + return split_phrases + + +# test split_noun_coordination on 6 different cases +def test_split_noun_coordination( + noun_construction_case1, + noun_construction_case2, + noun_construction_case3, + noun_construction_case4, + noun_construction_case5, + noun_construction_case6, + noun_construction_case7, + noun_construction_case8, + noun_construction_case9, + noun_construction_case10, + noun_construction_case11, +): + + # test 1: no modifier - it should return None from _split_doc + case1_split = split_noun_coordination(noun_construction_case1) + + assert case1_split == None + + # test 2: modifier is at the beginning of the noun phrase + case2_split = split_noun_coordination(noun_construction_case2) + + assert len(case2_split) == 2 + assert isinstance(case2_split, list) + assert all(isinstance(phrase, str) for phrase in case2_split) + assert case2_split == ["red apples", "red oranges"] + + # test 3: modifier is at the end of the noun phrase + case3_split = split_noun_coordination(noun_construction_case3) + + assert len(case3_split) == 2 + assert isinstance(case3_split, list) + assert all(isinstance(phrase, str) for phrase in case3_split) + assert case3_split == ["juicy oranges", "juicy apples"] + + # test 4: deal with compound nouns + case4_split = split_noun_coordination(noun_construction_case4) + + assert len(case4_split) == 2 + assert isinstance(case4_split, list) + assert all(isinstance(phrase, str) for phrase in case4_split) + assert case4_split == ["hot chicken wings", "hot soup"] + + # #test 5: same # of modifiers as nouns + # case5_split = split_noun_coordination(noun_construction_case5) + # assert case5_split == None + + # test 6: modifier phrases + case6_split = split_noun_coordination(noun_construction_case6) + + assert len(case6_split) == 2 + assert isinstance(case6_split, list) + assert all(isinstance(phrase, str) for phrase in case6_split) + assert case6_split == ["very green apples", "very green oranges"] + + ## test cases for coordinating adjectives + + # test 7: + case7_split = split_noun_coordination(noun_construction_case7) + print(case7_split) + assert case7_split == ["fresh apples", "juicy apples"] + + # test 8: + case8_split = split_noun_coordination(noun_construction_case8) + assert case8_split == ["fresh apples", "juicy apples", "delicious apples"] + + # test 9: + case9_split = split_noun_coordination(noun_construction_case9) + assert case9_split == ["fresh apples", "quite sour apples"] + + # test 10: + case10_split = split_noun_coordination(noun_construction_case10) + assert case10_split == [ + "fresh apples", + "quite sour apples", + "fresh chicken wings", + "quite sour chicken wings", + ] + + # test 11: + case11_split = split_noun_coordination(noun_construction_case11) + pass + + +################### test factory ############################## + + +def test_coordinationruler(nlp, noun_construction_case2): + assert len(noun_construction_case2) == 4 + assert [d.text for d in noun_construction_case2] == [ + "red", + "apples", + "and", + "oranges", + ] + + coord_splitter = nlp.add_pipe("coordination_splitter") + assert len(coord_splitter.rules) == 1 + assert coord_splitter.name == "coordination_splitter" + doc_split = coord_splitter(noun_construction_case2) + assert len(doc_split) == 2 + assert [t.text for t in doc_split] == ["red apples", "red oranges"] + + +def test_coordinationruler_clear_rules(nlp): + coord_splitter = nlp.add_pipe("coordination_splitter") + assert len(coord_splitter.rules) == 1 + coord_splitter.clear_rules() + assert len(coord_splitter.rules) == 0 + assert coord_splitter.rules == [] + + +def test_coordinationruler_add_rule(nlp): + coord_splitter = nlp.add_pipe("coordination_splitter") + assert len(coord_splitter.rules) == 1 + coord_splitter.add_rule(_my_custom_splitting_rule) + assert len(coord_splitter.rules) == 2 + + +def test_coordinationruler_add_rules(nlp, noun_construction_case2): + + coord_splitter = nlp.add_pipe("coordination_splitter") + coord_splitter.clear_rules() + coord_splitter.add_rules([_my_custom_splitting_rule, _my_custom_splitting_rule]) + assert len(coord_splitter.rules) == 2 + doc_split = coord_splitter(noun_construction_case2) + assert len(doc_split) == 2 + + assert [t.text for t in doc_split] == ["test1", "test2"] + + +def test_coordinationruler_add_default_rules(nlp): + coord_splitter = nlp.add_pipe("coordination_splitter") + coord_splitter.clear_rules() + assert len(coord_splitter.rules) == 0 + coord_splitter.add_default_rules() + assert len(coord_splitter.rules) == 1