diff --git a/tests/celery_test_workers.py b/tests/celery_test_workers.py index 805ffc090..357c7a53b 100644 --- a/tests/celery_test_workers.py +++ b/tests/celery_test_workers.py @@ -36,7 +36,8 @@ import signal import subprocess from time import sleep -from typing import Dict, List +from types import TracebackType +from typing import Dict, List, Type from celery import Celery @@ -44,6 +45,9 @@ class CeleryTestWorkersManager: """ A class to handle the setup and teardown of celery workers. + This should be treated as a context and used with python's + built-in 'with' statement. If you use it without this statement, + beware that the processes spun up here may never be stopped. """ def __init__(self, app: Celery): @@ -52,6 +56,31 @@ def __init__(self, app: Celery): self.worker_processes = {} self.echo_processes = {} + def __enter__(self): + """This magic method is necessary for allowing this class to be used as a context manager.""" + return self + + def __exit__(self, exc_type: Type[Exception], exc_value: Exception, traceback: TracebackType): + """ + This will always run at the end of a context with statement, even if an error is raised. + It's a safe way to ensure all of our subprocesses are stopped no matter what. + """ + + # Try to stop everything gracefully first + self.stop_all_workers() + + # Check that all the worker processes were stopped, otherwise forcefully terminate them + for worker_process in self.worker_processes.values(): + if worker_process.is_alive(): + worker_process.kill() + + # Check that all the echo processes were stopped, otherwise forcefully terminate them + ps_proc = subprocess.run("ps ux", shell=True, capture_output=True, text=True) + for pid in self.echo_processes.values(): + if str(pid) in ps_proc.stdout: + print(f"pid: {pid}") + os.kill(pid, signal.SIGKILL) + def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool: """ Check to see if the worker is up and running yet. @@ -147,7 +176,7 @@ def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = shell=True, preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up ) - self.echo_processes[worker_name] = echo_process + self.echo_processes[worker_name] = echo_process.pid # Start the worker in a separate process since it'll take control of the entire process until we kill it worker_process = multiprocessing.Process(target=self.start_worker, args=(worker_launch_cmd,)) @@ -192,7 +221,8 @@ def stop_worker(self, worker_name: str): self.worker_processes[worker_name].kill() # Terminate the echo process and its sleep inf subprocess - os.killpg(os.getpgid(self.echo_processes[worker_name].pid), signal.SIGTERM) + os.killpg(os.getpgid(self.echo_processes[worker_name]), signal.SIGTERM) + sleep(2) def stop_all_workers(self): """ diff --git a/tests/conftest.py b/tests/conftest.py index 43cbc80bf..38c6b0334 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -204,12 +204,6 @@ def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pyl # (basically just add in concurrency value to worker_queue_map) worker_info = {worker_name: {"concurrency": 1, "queues": [queue]} for worker_name, queue in worker_queue_map.items()} - # Create our workers manager and launch our workers - workers_manager = CeleryTestWorkersManager(celery_app) - workers_manager.launch_workers(worker_info) - - # Yield control to the tests that need workers launched - yield - - # Tests are done so shut down all of our workers - workers_manager.stop_all_workers() + with CeleryTestWorkersManager(celery_app) as workers_manager: + workers_manager.launch_workers(worker_info) + yield