From ca911b0ac02aa466ab4f20c4dd3300cfbf577b20 Mon Sep 17 00:00:00 2001 From: Charles Baynham Date: Tue, 26 May 2020 12:46:06 +0100 Subject: [PATCH] client: Improve handling of failed server connections * Handle closed connections from the server. * Refactor BestEffortClient to reuse logic from Client via composition * Add a thread to BestEffortClient which regularly checks for connectivity and attempts to reestablish it if it is lost --- sipyco/pc_rpc.py | 328 ++++++++++++++++++++++++------------- sipyco/test/test_pc_rpc.py | 137 +++++++++++++++- 2 files changed, 339 insertions(+), 126 deletions(-) diff --git a/sipyco/pc_rpc.py b/sipyco/pc_rpc.py index 5ced066..93d90e2 100644 --- a/sipyco/pc_rpc.py +++ b/sipyco/pc_rpc.py @@ -22,7 +22,7 @@ from sipyco.monkey_patches import * from sipyco import pyon from sipyco.asyncio_tools import AsyncioServer as _AsyncioServer -from sipyco.packed_exceptions import * +from sipyco.packed_exceptions import current_exc_packed, raise_packed_exc logger = logging.getLogger(__name__) @@ -39,6 +39,13 @@ class IncompatibleServer(Exception): pass +class RPCConnectionError(ConnectionError): + """Raised by the client when a method called fails due to connection + problems with the RPC server. + """ + pass + + _init_string = b"ARTIQ pc_rpc\n" @@ -74,14 +81,18 @@ class Client: automatically attempted. The user must call :meth:`~sipyco.pc_rpc.Client.close_rpc` to free resources properly after initialization completes successfully. + If the remote server shuts down during operation, RPCConnectionError is + raised by future calls to Client methods. The user should call + :meth:`~sipyco.pc_rpc.Client.close_rpc` and then discard this object. + :param host: Identifier of the server. The string can represent a hostname or a IPv4 or IPv6 address (see ``socket.create_connection`` in the Python standard library). :param port: TCP port to use. :param target_name: Target name to select. ``IncompatibleServer`` is raised if the target does not exist. - Use :class:`.AutoTarget` for automatic selection if the server has only one - target. + Use :class:`.AutoTarget` for automatic selection if the server has + only one target. Use ``None`` to skip selecting a target. The list of targets can then be retrieved using :meth:`~sipyco.pc_rpc.Client.get_rpc_id` and then one can be selected later using :meth:`~sipyco.pc_rpc.Client.select_rpc_target`. @@ -93,10 +104,11 @@ class Client: in the middle of a RPC can break subsequent RPCs (from the same client). """ - def __init__(self, host, port, target_name=AutoTarget, timeout=None): self.__socket = socket.create_connection((host, port), timeout) + self.__closed = False + try: self.__socket.sendall(_init_string) @@ -133,30 +145,70 @@ def get_local_host(self): """Returns the address of the local end of the connection.""" return self.__socket.getsockname()[0] + def get_valid_methods(self): + """Returns a set of names of methods which can be called on + the server""" + return self.__valid_methods.copy() + + def is_closed(self): + """Return True if the connection to the server has been closed + + This method will actively query the server for data to check + if the connection is working. """ + self.__closed = not self.__socket_is_open() + + return self.__closed + def close_rpc(self): """Closes the connection to the RPC server. No further method calls should be done after this method is called. """ + self.__closed = True self.__socket.close() + def __socket_is_open(self): + # Consensus seems to be that there's no reliable way to check a socket + # for disconnection without writing to it / receiving from it. We'll + # use the "get_rpc_method" action as a test of the connection since all + # Servers are guaranteed to have it available + if self.__closed: + return False + + try: + self.get_rpc_method_list() + except socket.error: + return False + else: + return True + def __send(self, obj): line = pyon.encode(obj) + "\n" self.__socket.sendall(line.encode()) def __recv(self): - buf = self.__socket.recv(4096).decode() - while "\n" not in buf: - more = self.__socket.recv(4096) - if not more: - break - buf += more.decode() + if not self.__closed: + buf = self.__socket.recv(4096).decode() + while "\n" not in buf: + more = self.__socket.recv(4096) + if not more: + break + buf += more.decode() + if self.__closed or not buf: + self.__closed = True + raise RPCConnectionError("Connection closed by server") return pyon.decode(buf) def __do_action(self, action): - self.__send(action) + try: + self.__send(action) + obj = self.__recv() + except ConnectionError as e: + if isinstance(e, RPCConnectionError): + raise e + else: + raise RPCConnectionError("Failure in RPC comms") - obj = self.__recv() if obj["status"] == "ok": return obj["ret"] elif obj["status"] == "failed": @@ -174,7 +226,7 @@ def get_rpc_method_list(self): def __getattr__(self, name): if name not in self.__valid_methods: - raise AttributeError + raise AttributeError("{} not found".format(name)) def proxy(*args, **kwargs): return self.__do_rpc(name, args, kwargs) @@ -185,23 +237,27 @@ class AsyncioClient: """This class is similar to :class:`sipyco.pc_rpc.Client`, but uses ``asyncio`` instead of blocking calls. - All RPC methods are coroutines. + All RPC methods are coroutines. As with :class:`sipyco.pc_rpc.Client`, + methods will raise RPCConnectionError if the server closes the + connection. The user should call + :meth:`~sipyco.pc_rpc.AsyncioClient.close_rpc` and then discard this + object. Concurrent access from different asyncio tasks is supported; all calls use a single lock. """ - def __init__(self): self.__lock = asyncio.Lock() self.__reader = None self.__writer = None self.__target_names = None self.__description = None + self.__closed = False async def connect_rpc(self, host, port, target_name): """Connects to the server. This cannot be done in __init__ because - this method is a coroutine. See :class:`sipyco.pc_rpc.Client` for a description of the - parameters.""" + this method is a coroutine. See :class:`sipyco.pc_rpc.Client` for a + description of the parameters.""" self.__reader, self.__writer = \ await asyncio.open_connection(host, port, limit=100*1024*1024) try: @@ -231,6 +287,31 @@ def get_selected_target(self): selected yet.""" return self.__selected_target + async def is_closed(self): + """Return True if the connection to the server has been closed""" + self.__closed = not await self.__socket_is_open() + + return self.__closed + + async def get_rpc_method_list(self): + obj = {"action": "get_rpc_method_list"} + return await self.__do_action(obj) + + async def __socket_is_open(self): + # Consensus seems to be that there's no reliable way to check a socket + # for disconnection without writing to it / receiving from it. We'll + # use the "get_rpc_method" action as a test of the connection since all + # Servers are guaranteed to have it available + if self.__closed: + return False + + try: + await self.get_rpc_method_list() + except socket.error: + return False + else: + return True + def get_local_host(self): """Returns the address of the local end of the connection.""" return self.__writer.get_extra_info("socket").getsockname()[0] @@ -257,14 +338,16 @@ def __send(self, obj): self.__writer.write(line.encode()) async def __recv(self): - line = await self.__reader.readline() + if not self.__closed: + line = await self.__reader.readline() + if self.__closed or not line: + self.__closed = True + raise RPCConnectionError("Connection closed by server") return pyon.decode(line.decode()) - async def __do_rpc(self, name, args, kwargs): + async def __do_action(self, obj): await self.__lock.acquire() try: - obj = {"action": "call", "name": name, - "args": args, "kwargs": kwargs} self.__send(obj) obj = await self.__recv() @@ -277,9 +360,13 @@ async def __do_rpc(self, name, args, kwargs): finally: self.__lock.release() + async def __do_rpc(self, name, args, kwargs): + obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} + return await self.__do_action(obj) + def __getattr__(self, name): if name not in self.__valid_methods: - raise AttributeError + raise AttributeError("{} not found".format(name)) async def proxy(*args, **kwargs): res = await self.__do_rpc(name, args, kwargs) @@ -295,10 +382,13 @@ class BestEffortClient: RPC calls that failed because of network errors return ``None``. Other RPC calls are blocking and return the correct value. + This class will launch a Thread which monitors for a failed connection in + background. If this occurs the connection will be reopened if possible. + :param firstcon_timeout: Timeout to use during the first (blocking) connection attempt at object initialization. - :param retry: Amount of time to wait between retries when reconnecting - in the background. + :param retry: Amount of time to wait between background connectivity + checks / attempts to reconnect. """ def __init__(self, host, port, target_name, @@ -308,120 +398,125 @@ def __init__(self, host, port, target_name, self.__target_name = target_name self.__retry = retry - self.__conretry_terminate = False - self.__socket = None + self.__conmonitor_terminate = False self.__valid_methods = set() + self.__client = None # type: Client + self.__client_lock = threading.RLock() + try: self.__coninit(firstcon_timeout) - except: + except ConnectionError: logger.warning("first connection attempt to %s:%d[%s] failed, " "retrying in the background", self.__host, self.__port, self.__target_name, exc_info=True) - self.__start_conretry() - else: - self.__conretry_thread = None + + self.__start_conmonitor() def __coninit(self, timeout): - if timeout is None: - self.__socket = socket.create_connection( - (self.__host, self.__port)) + try: + self.__client = Client(self.__host, self.__port, + self.__target_name, timeout) + except: # noqa + self.__client = None + raise else: - self.__socket = socket.create_connection( - (self.__host, self.__port), timeout) - self.__socket.settimeout(None) - self.__socket.sendall(_init_string) - server_identification = self.__recv() - target_name = _validate_target_name(self.__target_name, - server_identification["targets"]) - self.__socket.sendall((target_name + "\n").encode()) - self.__valid_methods = self.__recv() + # Get the valid methods so we can check if an attribute exists + # even if the client is later deleted due to connection error + self.__valid_methods = self.__client.get_valid_methods() - def __start_conretry(self): - self.__conretry_thread = threading.Thread(target=self.__conretry) - self.__conretry_thread.start() + def __start_conmonitor(self): + self.__conmonitor_thread = threading.Thread(target=self.__conmonitor) + self.__conmonitor_thread.start() - def __conretry(self): + def __conmonitor(self): while True: - try: - self.__coninit(None) - except: - if self.__conretry_terminate: - break - time.sleep(self.__retry) - else: + if self.__client: + # The client should be connected: check that the socket + # is still open (i.e. the server has not disconnected) + with self.__client_lock: + if self.__client.is_closed(): + self.__mark_invalid() + + if not self.__client: + # The client is disconnected. Try to reconnect + try: + self.__coninit(None) + except Exception: + pass + else: + logger.warning( + "connection to %s:%d[%s] established in " + "the background", + self.__host, self.__port, self.__target_name + ) + + if self.__conmonitor_terminate: + logger.debug("Terminate request received: closing client and " + "shutting down monitoring thread") + if self.__client is not None: + with self.__client_lock: + self.__client.close_rpc() + + self.__conmonitor_thread = None break - if not self.__conretry_terminate: - logger.warning("connection to %s:%d[%s] established in " - "the background", - self.__host, self.__port, self.__target_name) - if self.__conretry_terminate and self.__socket is not None: - self.__socket.close() - # must be after __socket.close() to avoid race condition - self.__conretry_thread = None + + time.sleep(self.__retry) + + def __mark_invalid(self): + logger.warning("connection to %s:%d[%s] failed. " + "It will be retried in the background", + self.__host, self.__port, self.__target_name) + try: + self.__client.close_rpc() + except ConnectionError: + pass + finally: + self.__client = None def close_rpc(self): """Closes the connection to the RPC server. No further method calls should be done after this method is called. """ - if self.__conretry_thread is None: - if self.__socket is not None: - self.__socket.close() - else: - # Let the thread complete I/O and then do the socket closing. - # Python fails to provide a way to cancel threads... - self.__conretry_terminate = True + # Let the thread complete I/O and then do the socket closing. + # Python fails to provide a way to cancel threads... + self.__conmonitor_terminate = True - def __send(self, obj): - line = pyon.encode(obj) + "\n" - self.__socket.sendall(line.encode()) + def is_closed(self): + if not self.__client: + return True - def __recv(self): - buf = self.__socket.recv(4096).decode() - while "\n" not in buf: - more = self.__socket.recv(4096) - if not more: - break - buf += more.decode() - return pyon.decode(buf) - - def __do_rpc(self, name, args, kwargs): - if self.__conretry_thread is not None: - return None - - obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} - try: - self.__send(obj) - obj = self.__recv() - except: - logger.warning("connection failed while attempting " - "RPC to %s:%d[%s], re-establishing connection " - "in the background", - self.__host, self.__port, self.__target_name) - self.__start_conretry() - return None - else: - if obj["status"] == "ok": - return obj["ret"] - elif obj["status"] == "failed": - raise_packed_exc(obj["exception"]) - else: - raise ValueError + with self.__client_lock: + return self.__client.is_closed() def __getattr__(self, name): - if name not in self.__valid_methods: - raise AttributeError - - def proxy(*args, **kwargs): - return self.__do_rpc(name, args, kwargs) - return proxy + """ + If the client is connected, pass this call onto the client. If not, + return a method that returns None if the call was valid or if we can't + tell. Raise AttributeError otherwise. + """ - def get_selected_target(self): - raise NotImplementedError + if ( + self.__valid_methods + and name not in self.__valid_methods + and name not in dir(Client) + ): + raise AttributeError + else: + # If the client exists, pass the call to the client + def tryer(*args, **kwargs): + if not self.__client: + return None - def get_local_host(self): - raise NotImplementedError + with self.__client_lock: + client_method = getattr(self.__client, name) + try: + return client_method(*args, **kwargs) + except RPCConnectionError: + self.__mark_invalid() + return None + return tryer def _format_arguments(arguments): @@ -479,7 +574,6 @@ class Server(_AsyncioServer): :param allow_parallel: Allow concurrent asyncio calls to the target's methods. """ - def __init__(self, targets, description=None, builtin_terminate=False, allow_parallel=False): _AsyncioServer.__init__(self) @@ -497,12 +591,12 @@ def __init__(self, targets, description=None, builtin_terminate=False, def _document_function(function): """ Turn a function into a tuple of its arguments and documentation. - + Allows remote inspection of what methods are available on a local device. - + Args: function (Callable): a Python function to be documented. - + Returns: Tuple[dict, str]: tuple of (argument specifications, function documentation). @@ -610,7 +704,7 @@ async def _handle_connection_cr(self, reader, writer): if not line: break reply = await self._process_and_pyonize(target, - pyon.decode(line.decode())) + pyon.decode(line.decode())) writer.write((reply + "\n").encode()) except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): # May happens on Windows when client disconnects diff --git a/sipyco/test/test_pc_rpc.py b/sipyco/test/test_pc_rpc.py index 44a8cc4..ad58968 100644 --- a/sipyco/test/test_pc_rpc.py +++ b/sipyco/test/test_pc_rpc.py @@ -46,15 +46,23 @@ def _blocking_echo(self, target, die_using_sys_exit=False): self.assertEqual(test_object, test_object_back) test_object_back = remote.async_echo(test_object) self.assertEqual(test_object, test_object_back) + with self.assertRaises(TypeError): + remote.return_unserializable() + with self.assertRaises(ValueError): + remote.raise_value_error() + with self.assertRaises(ConnectionError): + remote.raise_conn_error() with self.assertRaises(AttributeError): remote.non_existing_method if die_using_sys_exit: - # If the server dies and just drops the connection, we - # expect a client-side error due to lack of data. - with self.assertRaises(SyntaxError): + # If the server dies and drops the connection, the + # client should notice + with self.assertRaises(pc_rpc.RPCConnectionError): remote.raise_sys_exit() + self.assertTrue(remote.is_closed()) else: remote.terminate() + self.assertTrue(remote.is_closed()) finally: remote.close_rpc() @@ -64,10 +72,16 @@ def test_blocking_echo(self): def test_sys_exit(self): self._run_server_and_test(self._blocking_echo, "test", True) + def test_sys_exit_asyncio(self): + self._run_server_and_test(self._loop_asyncio_echo, "test", True) + + def test_sys_exit_best_effort(self): + self._run_server_and_test(self._best_effort_echo, "test", True) + def test_blocking_echo_autotarget(self): self._run_server_and_test(self._blocking_echo, pc_rpc.AutoTarget) - async def _asyncio_echo(self, target): + async def _asyncio_echo(self, target, die_using_sys_exit): remote = pc_rpc.AsyncioClient() for attempt in range(100): await asyncio.sleep(.2) @@ -84,17 +98,28 @@ async def _asyncio_echo(self, target): self.assertEqual(test_object, test_object_back) with self.assertRaises(TypeError): await remote.return_unserializable() + with self.assertRaises(ValueError): + await remote.raise_value_error() + with self.assertRaises(ConnectionError): + await remote.raise_conn_error() with self.assertRaises(AttributeError): await remote.non_existing_method - await remote.terminate() + if die_using_sys_exit: + with self.assertRaises(pc_rpc.RPCConnectionError): + await remote.raise_sys_exit() + self.assertTrue(await remote.is_closed()) + else: + await remote.terminate() + self.assertTrue(await remote.is_closed()) finally: remote.close_rpc() - def _loop_asyncio_echo(self, target): + def _loop_asyncio_echo(self, target, die_using_sys_exit=False): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - loop.run_until_complete(self._asyncio_echo(target)) + loop.run_until_complete( + self._asyncio_echo(target, die_using_sys_exit)) finally: loop.close() @@ -104,6 +129,93 @@ def test_asyncio_echo(self): def test_asyncio_echo_autotarget(self): self._run_server_and_test(self._loop_asyncio_echo, pc_rpc.AutoTarget) + def _best_effort_echo(self, target, die_using_sys_exit=False): + for attempt in range(100): + time.sleep(.2) + try: + pc_rpc.Client(test_address, test_port, + target) + break + except ConnectionRefusedError: + pass + + remote = pc_rpc.BestEffortClient(test_address, test_port, + target) + + try: + test_object_back = remote.echo(test_object) + self.assertEqual(test_object, test_object_back) + test_object_back = remote.async_echo(test_object) + self.assertEqual(test_object, test_object_back) + with self.assertRaises(AttributeError): + remote.non_existing_method + with self.assertRaises(ValueError): + remote.raise_value_error() + with self.assertRaises(ConnectionError): + remote.raise_conn_error() + if die_using_sys_exit: + self.assertIsNone(remote.raise_sys_exit()) + self.assertIsNone(remote.echo(test_object)) + self.assertTrue(remote.is_closed()) + else: + remote.terminate() + self.assertIsNone(remote.echo(test_object)) + self.assertTrue(remote.is_closed()) + finally: + remote.close_rpc() + + def _best_effort_fail_reconnect(self): + target = "test" + + for attempt in range(100): + time.sleep(.2) + try: + pc_rpc.Client(test_address, test_port, + target) + break + except ConnectionRefusedError: + pass + + remote = pc_rpc.BestEffortClient(test_address, test_port, + target, retry=1.0) + + try: + # Check it's working + test_object_back = remote.echo(test_object) + self.assertEqual(test_object, test_object_back) + + # Kill the server + self.assertIsNone(remote.raise_sys_exit()) + self.assertTrue(remote.is_closed()) + + # Restart it and try for 5s to run a method + def test_reconnected(): + for _ in range(5): + time.sleep(1) + test_object_back = remote.echo(test_object) + if test_object_back is not None: + break + + self.assertEqual(test_object, test_object_back) + + # Kill it again + remote.terminate() + self.assertTrue(remote.is_closed()) + + self._run_server_and_test(test_reconnected) + + finally: + remote.close_rpc() + + def test_best_effort_fail_reconnect(self): + self._run_server_and_test(self._best_effort_fail_reconnect) + + def test_best_effort_echo(self): + self._run_server_and_test(self._best_effort_echo, "test") + + def test_best_effort_autotarget(self): + self._run_server_and_test(self._best_effort_echo, pc_rpc.AutoTarget) + def test_rpc_encode_function(self): """Test that `pc_rpc` can encode a function properly. @@ -112,7 +224,7 @@ def test_rpc_encode_function(self): """ def _annotated_function( - arg1: str, arg2: np.ndarray = np.array([1,]) + arg1: str, arg2: np.ndarray = np.array([1, ]) ) -> np.ndarray: """Sample docstring.""" return arg1 @@ -124,7 +236,8 @@ def _annotated_function( # purposefully ignore how argspec["annotations"] is treated. # allows option to change PYON later to encode annotations. - argspec_master = dict(inspect.getfullargspec(_annotated_function)._asdict()) + argspec_master = dict(inspect.getfullargspec( + _annotated_function)._asdict()) argspec_without_annotation = argspec_master.copy() del argspec_without_annotation["annotations"] # check if all items (excluding annotations) are same in both dictionaries @@ -143,6 +256,12 @@ def raise_sys_exit(self): def echo(self, x): return x + def raise_value_error(self): + raise ValueError + + def raise_conn_error(self): + raise ConnectionError + async def async_echo(self, x): await asyncio.sleep(0.01) return x