Skip to content

Commit

Permalink
Make Language.pipe workers exit cleanly
Browse files Browse the repository at this point in the history
Also warn when any worker exited with a non-zero exit code and modify
test to ensure that workers exit cleanly by default.
  • Loading branch information
danieldk committed Feb 12, 2024
1 parent 14bd9d8 commit 98d54a9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
1 change: 1 addition & 0 deletions spacy/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class Warnings(metaclass=ErrorsWithCodes):
"key attribute for vectors, configure it through Vectors(attr=) or "
"'spacy init vectors --attr'")
W126 = ("These keys are unsupported: {unsupported}")
W127 = ("Not all `Language.pipe` worker processes completed successfully")


class Errors(metaclass=ErrorsWithCodes):
Expand Down
5 changes: 5 additions & 0 deletions spacy/language.py
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,9 @@ def prepare_input(
for proc in procs:
proc.join()

if not all(proc.exitcode == 0 for proc in procs):
warnings.warn(Warnings.W127)

def _link_components(self) -> None:
"""Register 'listeners' within pipeline components, to allow them to
effectively share weights.
Expand Down Expand Up @@ -2350,6 +2353,7 @@ def _apply_pipes(
if isinstance(texts_with_ctx, _WorkDoneSentinel):
sender.close()
receiver.close()
return

docs = (
ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx
Expand All @@ -2375,6 +2379,7 @@ def _apply_pipes(
# stop processing.
sender.close()
receiver.close()
return


class _Sender:
Expand Down
11 changes: 8 additions & 3 deletions spacy/tests/test_language.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import warnings
import logging
from unittest import mock

Expand Down Expand Up @@ -738,9 +739,13 @@ def test_pass_doc_to_pipeline(nlp, n_process):
assert doc.text == texts[0]
assert len(doc.cats) > 0
if isinstance(get_current_ops(), NumpyOps) or n_process < 2:
docs = nlp.pipe(docs, n_process=n_process)
assert [doc.text for doc in docs] == texts
assert all(len(doc.cats) for doc in docs)
# Catch warnings to ensure that all worker processes exited
# succesfully.
with warnings.catch_warnings():
warnings.simplefilter("error")
docs = nlp.pipe(docs, n_process=n_process)
assert [doc.text for doc in docs] == texts
assert all(len(doc.cats) for doc in docs)


def test_invalid_arg_to_pipeline(nlp):
Expand Down

0 comments on commit 98d54a9

Please sign in to comment.