From 6fb32db8789a9ad461e432758169f7dba1026021 Mon Sep 17 00:00:00 2001 From: Martin Larralde Date: Thu, 17 Aug 2023 12:19:02 +0200 Subject: [PATCH] Add a dummy `ThreadPool` implementation for unavailable `multiprocessing.pool` --- pronto/parsers/base.py | 8 ++++++-- pronto/parsers/obo.py | 3 +-- pronto/parsers/obojson.py | 3 +-- pronto/utils/pool.py | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) create mode 100644 pronto/utils/pool.py diff --git a/pronto/parsers/base.py b/pronto/parsers/base.py index 6848425..d9ae256 100644 --- a/pronto/parsers/base.py +++ b/pronto/parsers/base.py @@ -1,6 +1,5 @@ import abc import functools -import multiprocessing.pool import operator import os import typing @@ -9,6 +8,7 @@ from ..logic.lineage import Lineage from ..ontology import Ontology +from ..utils.pool import ThreadPool class BaseParser(abc.ABC): @@ -27,6 +27,10 @@ def parse_from( ) -> None: return NotImplemented + @classmethod + def pool(cls, threads: int) -> ThreadPool: + return ThreadPool(threads) + @classmethod def process_import( cls, @@ -66,7 +70,7 @@ def process_imports( basepath=basepath, timeout=timeout, ) - with multiprocessing.pool.ThreadPool(threads) as pool: + with cls.pool(threads) as pool: return dict(pool.map(lambda i: (i, process(i)), imports)) _entities = { diff --git a/pronto/parsers/obo.py b/pronto/parsers/obo.py index cab4e43..22d551f 100644 --- a/pronto/parsers/obo.py +++ b/pronto/parsers/obo.py @@ -1,4 +1,3 @@ -import multiprocessing.pool import os import fastobo @@ -38,7 +37,7 @@ def parse_from(self, handle, threads=None): # Extract frames from the current document. with typechecked.disabled(): try: - with multiprocessing.pool.ThreadPool(threads) as pool: + with self.pool(threads) as pool: pool.map(self.extract_entity, doc) except SyntaxError as s: location = self.ont.path, s.lineno, s.offset, s.text diff --git a/pronto/parsers/obojson.py b/pronto/parsers/obojson.py index 6702a36..707e6f2 100644 --- a/pronto/parsers/obojson.py +++ b/pronto/parsers/obojson.py @@ -1,4 +1,3 @@ -import multiprocessing.pool import os import fastobo @@ -39,7 +38,7 @@ def parse_from(self, handle, threads=None): # Extract frames from the current document. with typechecked.disabled(): try: - with multiprocessing.pool.ThreadPool(threads) as pool: + with self.pool(threads) as pool: pool.map(self.extract_entity, doc) except SyntaxError as err: location = self.ont.path, err.lineno, err.offset, err.text diff --git a/pronto/utils/pool.py b/pronto/utils/pool.py new file mode 100644 index 0000000..643de13 --- /dev/null +++ b/pronto/utils/pool.py @@ -0,0 +1,35 @@ +import typing +from typing import Callable, Iterable, List + +try: + from multiprocessing.pool import ThreadPool as _ThreadPool +except ImportError: + _ThreadPool = None # type: ignore + + +_T = typing.TypeVar("_T") +_U = typing.TypeVar("_U") + + +class ThreadPool(object): + + def __init__(self, threads: int = 0): + self.threads = threads + self.pool = None if _ThreadPool is None else _ThreadPool(self.threads) + + def __enter__(self) -> "Pool": + if self.pool is not None: + self.pool.__enter__() + return self + + def __exit__(self, exc_val, exc_ty, tb): + if self.pool is not None: + return self.pool.__exit__(exc_val, exc_ty, tb) + return False + + def map(self, func: Callable[[_T], _U], items: Iterable[_T]) -> List[_U]: + if self.pool is None: + return list(map(func, items)) + else: + return self.pool.map(func, items) +