Skip to content

Commit

Permalink
Merge pull request #265 from DUNE-DAQ/plasorak/faster-controller-stop
Browse files Browse the repository at this point in the history
Die quicker, controller
  • Loading branch information
plasorak authored Oct 17, 2024
2 parents e4f484c + a2fcb19 commit 8c70ed7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
11 changes: 8 additions & 3 deletions src/drunc/connectivity_service/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ def __init__(self, session:str, address:str):
# assume the simplest case here
self.address = f'http://{address}'

def retract(self, uid):
def retract(self, uid, data_type:str, break_on_unreachable:bool=False):
from drunc.utils.utils import http_post
data = {
'partition': self.session,
'connections': [
{
'connection_id': uid,
'data_type': 'run-control-messages',
'data_type': data_type,
}
]
}
Expand All @@ -59,9 +59,14 @@ def retract(self, uid):
r.raise_for_status()
break
except (HTTPError, ConnectionError) as e:

if break_on_unreachable and isinstance(e, ConnectionError):
self.logger.warning('Connectivity service seems unreachable, assuming it\'s already been killed')
break

from time import sleep
sleep(0.5)
continue




Expand Down
6 changes: 5 additions & 1 deletion src/drunc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def terminate(self):
if self.connectivity_service_thread:
self.connectivity_service_thread.join()
self.logger.info('Unregistering from the connectivity service')
self.connectivity_service.retract(self.name+"_control")
self.connectivity_service.retract(self.name+"_control", 'RunControlMessage', break_on_unreachable=True)

if self.can_broadcast():
self.broadcast(
Expand All @@ -332,6 +332,10 @@ def terminate(self):
if ResponseListener.exists():
ResponseListener.get().terminate()

import logging
if self.logger.level != logging.DEBUG:
return

import threading
self.logger.debug("Threading threads")
for t in threading.enumerate():
Expand Down

0 comments on commit 8c70ed7

Please sign in to comment.