From 5e75d2227133eadc13915d3d97719719273e0170 Mon Sep 17 00:00:00 2001 From: Theodoros Katzalis Date: Mon, 9 Sep 2024 14:42:53 +0200 Subject: [PATCH] Close connection and thread when model session exits Currently when creating a pipe, the server doesn't close the connection nor the created thread when the listening loop exits. This results to EOFError as we can see in the output when running the tests. --- tiktorch/rpc/mp.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/tiktorch/rpc/mp.py b/tiktorch/rpc/mp.py index 0cd69bf4..992ed761 100644 --- a/tiktorch/rpc/mp.py +++ b/tiktorch/rpc/mp.py @@ -270,7 +270,8 @@ def __init__(self, api, conn: Connection): self._logger = None self._conn = conn self._results_queue = queue.Queue() - self._start_result_sender(conn) + self._start_result_sender = threading.Thread(target=self._sender, name="MPResultSender") + self._start_result_sender.start() @property def logger(self): @@ -281,20 +282,17 @@ def logger(self): def _send(self, msg): self._results_queue.put(msg) - def _start_result_sender(self, conn): - def _sender(): - while True: - try: - result = self._results_queue.get() - if result is self._sentinel: - break - - conn.send(result) - except Exception: - self.logger.exception("Error in result sender") + def _sender(self): + while True: + try: + result = self._results_queue.get() + if result is self._sentinel: + self.logger.debug("[MPServer] result sender thread stopped") + break - t = threading.Thread(target=_sender, name="MPResultSender") - t.start() + self._conn.send(result) + except Exception: + self.logger.exception("Error in result sender") def _send_result(self, fut): if fut.cancelled(): @@ -317,7 +315,7 @@ def _make_future(self): return f def _call_method(self, call: MethodCall): - self.logger.debug("[id: %s] Recieved '%s' method call", call.id, call.method_name) + self.logger.debug("[id: %s] Received '%s' method call", call.id, call.method_name) fut = self._make_future() self._futures.put(call.id, fut) @@ -341,7 +339,7 @@ def _call_method(self, call: MethodCall): return fut def _cancel_request(self, cancel: Cancellation): - self.logger.debug("[id: %s] Recieved cancel request", cancel.id) + self.logger.debug("[id: %s] Received cancel request", cancel.id) fut = self._futures.pop_id(cancel.id) if fut: fut.cancel() @@ -373,3 +371,7 @@ def listen(self): break except Exception: self.logger.error("Error in main loop", exc_info=1) + + self._start_result_sender.join() + self._conn.close() + self.logger.debug("Closing connection")