Skip to content

Commit

Permalink
workers manager is now allowed to be used as a context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Dec 6, 2023
1 parent ce945d0 commit ab322b2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
36 changes: 33 additions & 3 deletions tests/celery_test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
import signal
import subprocess
from time import sleep
from typing import Dict, List
from types import TracebackType
from typing import Dict, List, Type

from celery import Celery


class CeleryTestWorkersManager:
"""
A class to handle the setup and teardown of celery workers.
This should be treated as a context and used with python's
built-in 'with' statement. If you use it without this statement,
beware that the processes spun up here may never be stopped.
"""

def __init__(self, app: Celery):
Expand All @@ -52,6 +56,31 @@ def __init__(self, app: Celery):
self.worker_processes = {}
self.echo_processes = {}

def __enter__(self):
"""This magic method is necessary for allowing this class to be used as a context manager."""
return self

def __exit__(self, exc_type: Type[Exception], exc_value: Exception, traceback: TracebackType):
"""
This will always run at the end of a context with statement, even if an error is raised.
It's a safe way to ensure all of our subprocesses are stopped no matter what.
"""

# Try to stop everything gracefully first
self.stop_all_workers()

# Check that all the worker processes were stopped, otherwise forcefully terminate them
for worker_process in self.worker_processes.values():
if worker_process.is_alive():
worker_process.kill()

# Check that all the echo processes were stopped, otherwise forcefully terminate them
ps_proc = subprocess.run("ps ux", shell=True, capture_output=True, text=True)
for pid in self.echo_processes.values():
if str(pid) in ps_proc.stdout:
print(f"pid: {pid}")
os.kill(pid, signal.SIGKILL)

def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool:
"""
Check to see if the worker is up and running yet.
Expand Down Expand Up @@ -147,7 +176,7 @@ def launch_worker(self, worker_name: str, queues: List[str], concurrency: int =
shell=True,
preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up
)
self.echo_processes[worker_name] = echo_process
self.echo_processes[worker_name] = echo_process.pid

# Start the worker in a separate process since it'll take control of the entire process until we kill it
worker_process = multiprocessing.Process(target=self.start_worker, args=(worker_launch_cmd,))
Expand Down Expand Up @@ -192,7 +221,8 @@ def stop_worker(self, worker_name: str):
self.worker_processes[worker_name].kill()

# Terminate the echo process and its sleep inf subprocess
os.killpg(os.getpgid(self.echo_processes[worker_name].pid), signal.SIGTERM)
os.killpg(os.getpgid(self.echo_processes[worker_name]), signal.SIGTERM)
sleep(2)

def stop_all_workers(self):
"""
Expand Down
12 changes: 3 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,6 @@ def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pyl
# (basically just add in concurrency value to worker_queue_map)
worker_info = {worker_name: {"concurrency": 1, "queues": [queue]} for worker_name, queue in worker_queue_map.items()}

# Create our workers manager and launch our workers
workers_manager = CeleryTestWorkersManager(celery_app)
workers_manager.launch_workers(worker_info)

# Yield control to the tests that need workers launched
yield

# Tests are done so shut down all of our workers
workers_manager.stop_all_workers()
with CeleryTestWorkersManager(celery_app) as workers_manager:
workers_manager.launch_workers(worker_info)
yield

0 comments on commit ab322b2

Please sign in to comment.