Skip to content

Commit

Permalink
Merge pull request #5 from uds-se/0.2.18
Browse files Browse the repository at this point in the history
added a pytest_main for collecting tests
  • Loading branch information
smythi93 authored Jul 25, 2024
2 parents 203fb63 + 5875fc4 commit a49c65f
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 74 deletions.
Empty file.
82 changes: 82 additions & 0 deletions src/sflkit/runners/resources/pytest_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import sys
from typing import Optional, Union, List, Sequence

# noinspection PyPackageRequirements
from _pytest._code import ExceptionInfo

# noinspection PyPackageRequirements
from _pytest._io import TerminalWriter

# noinspection PyPackageRequirements
from _pytest.config import (
ExitCode,
_prepareconfig,
ConftestImportFailure,
UsageError,
filter_traceback_for_conftest_import_failure,
_PluggyPlugin,
)


def pytest_collection_finish(session):
"""
Called after collection has been performed and modified.
"""
file = os.getenv("SFLKIT_PYTEST_COLLECTION_FINISH_FILE", "tmp_sflkit_pytest")
with open(file, "w") as f:
for item in session.items:
f.write(item.nodeid + "\n")


def main(
args: Optional[Union[List[str], "os.PathLike[str]"]] = None,
plugins: Optional[Sequence[Union[str, _PluggyPlugin]]] = None,
) -> Union[int, ExitCode]:
"""Perform an in-process test run.
:param args: List of command line arguments.
:param plugins: List of plugin objects to be auto-registered during initialization.
:returns: An exit code.
"""
try:
try:
config = _prepareconfig(args, plugins)
except ConftestImportFailure as e:
exc_info = ExceptionInfo.from_exc_info(e.excinfo)
tw = TerminalWriter(sys.stderr)
tw.line(f"ImportError while loading conftest '{e.path}'.", red=True)
exc_info.traceback = exc_info.traceback.filter(
filter_traceback_for_conftest_import_failure
)
exc_repr = (
exc_info.getrepr(style="short", chain=False)
if exc_info.traceback
else exc_info.exconly()
)
formatted_tb = str(exc_repr)
for line in formatted_tb.splitlines():
tw.line(line.rstrip(), red=True)
return ExitCode.USAGE_ERROR
else:
try:
config.hook.pytest_collection_finish = pytest_collection_finish
ret: Union[ExitCode, int] = config.hook.pytest_cmdline_main(
config=config
)
try:
return ExitCode(ret)
except ValueError:
return ret
finally:
config._ensure_unconfigure()
except UsageError as e:
tw = TerminalWriter(sys.stderr)
for msg in e.args:
tw.line(f"ERROR: {msg}\n", red=True)
return ExitCode.USAGE_ERROR


if __name__ == "__main__":
main(sys.argv[1:])
110 changes: 37 additions & 73 deletions src/sflkit/runners/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import enum
import hashlib
import json
import os
import re
import shutil
Expand All @@ -10,6 +9,8 @@
from pathlib import Path
from typing import List, Dict, Optional, Tuple

from sflkit.logger import LOGGER

Environment = Dict[str, str]

PYTEST_RESULT_PATTERN = re.compile(
Expand Down Expand Up @@ -119,6 +120,15 @@ class VoidRunner(Runner):


class PytestRunner(Runner):
def __init__(
self,
re_filter: str = r".*",
timeout=DEFAULT_TIMEOUT,
set_python_path: bool = False,
):
super().__init__(re_filter, timeout)
self.set_python_path = set_python_path

@staticmethod
def _common_base(directory: Path, tests: List[str]) -> Path:
parts = directory.parts
Expand Down Expand Up @@ -175,37 +185,40 @@ def get_tests(
root_dir = directory / base
else:
root_dir = directory
# install collector
subprocess.check_call(
[
"python3",
"-m",
"pip",
"install",
"pytest-collect-formatter",
],
env=environ,
cwd=directory,
)
tmp_json = os.path.abspath(f"tmp_{id(self)}.json")
subprocess.run(
tmp = Path(f"tmp_sflkit_pytest_tmp_{id(self)}").absolute()
pytest_main = (
Path(__file__).parent.resolve() / "resources" / "pytest_main.py"
).absolute()
environ = dict(environ or os.environ)
environ["SFLKIT_PYTEST_COLLECTION_FINISH_FILE"] = str(tmp)
if self.set_python_path:
if "PYTHONPATH" in environ:
environ["PYTHONPATH"] = str(directory) + ":" + environ["PYTHONPATH"]
else:
environ["PYTHONPATH"] = str(directory)
process = subprocess.run(
[
"python3",
"-m",
"pytest",
str(pytest_main),
"--collect-only",
f"--collect-output-file={tmp_json}",
"--collect-format=json",
"--collect-type=path",
]
+ c,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=environ,
cwd=directory,
)
with open(tmp_json, "r") as f:
tests = self.parse_tests(json.load(f))
os.remove(tmp_json)
return self._normalize_paths(tests, directory, root_dir)
LOGGER.info(f"pytest collection finished with {process.returncode}")
if not tmp.exists():
tmp = directory / "tmp_sflkit_pytest"
if tmp.exists():
with open(tmp, "r") as f:
tests = [line.strip() for line in f.readlines()]
os.remove(tmp)
return self._normalize_paths(tests, directory, root_dir)
else:
LOGGER.warn(f"Could not find {tmp}")
return []

@staticmethod
def __get_pytest_result__(
Expand Down Expand Up @@ -248,55 +261,6 @@ def run_test(
else:
return TestResult.UNDEFINED

def parse_tests(self, tests_json: List | Dict):
if isinstance(tests_json, dict):
tests = self._parse_dict(tests_json)
elif isinstance(tests_json, list):
tests = self._parse_list(tests_json)
else:
tests = list()
clean_tests = list()
for test in tests:
if test.startswith(os.sep):
test = test[len(os.sep) :]
elif test.startswith("::"):
test = test[2:]
clean_tests.append(test)
return clean_tests

def _parse_list(self, test_list: List[List | Dict]) -> List[str]:
tests = list()
for test in test_list:
if isinstance(test, dict):
tests += self._parse_dict(test)
elif isinstance(test, list):
tests += self._parse_list(test)
return tests

def _parse_dict(self, test: Dict) -> List[str]:
children = test.get("children", [])
type_ = test.get("type", "")
if not type_:
return []
title = test.get("title", "")
if not title:
return []
if children:
children = self._parse_list(children)
if type_ == "path":
sep = os.sep
elif type_ == "pytest_unit":
sep = "::"
else:
return []
return [f"{sep}{title}{child}" for child in children]
elif type_ == "pytest_unit":
return ["::" + title]
elif type_ == "path":
return [os.sep + title]
else:
return []


class UnittestRunner(Runner):
pass
Expand Down
2 changes: 1 addition & 1 deletion tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_runner(self):
exclude="tests",
)
instrument_config(config)
runner = PytestRunner()
runner = PytestRunner(set_python_path=True)
output = Path(BaseTest.TEST_DIR, "events").absolute()
runner.run(
Path(BaseTest.TEST_DIR), output, files=[Path("tests", "test_middle.py")]
Expand Down

0 comments on commit a49c65f

Please sign in to comment.