Skip to content

Commit

Permalink
Close connection and thread when model session exits
Browse files Browse the repository at this point in the history
Currently when creating a pipe, the server doesn't close the connection nor the created thread when the listening loop exits. This results to EOFError as we can see in the output when running the tests.
  • Loading branch information
thodkatz committed Sep 9, 2024
1 parent aac6768 commit 5e75d22
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions tiktorch/rpc/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ def __init__(self, api, conn: Connection):
self._logger = None
self._conn = conn
self._results_queue = queue.Queue()
self._start_result_sender(conn)
self._start_result_sender = threading.Thread(target=self._sender, name="MPResultSender")
self._start_result_sender.start()

@property
def logger(self):
Expand All @@ -281,20 +282,17 @@ def logger(self):
def _send(self, msg):
self._results_queue.put(msg)

def _start_result_sender(self, conn):
def _sender():
while True:
try:
result = self._results_queue.get()
if result is self._sentinel:
break

conn.send(result)
except Exception:
self.logger.exception("Error in result sender")
def _sender(self):
while True:
try:
result = self._results_queue.get()
if result is self._sentinel:
self.logger.debug("[MPServer] result sender thread stopped")
break

t = threading.Thread(target=_sender, name="MPResultSender")
t.start()
self._conn.send(result)
except Exception:
self.logger.exception("Error in result sender")

def _send_result(self, fut):
if fut.cancelled():
Expand All @@ -317,7 +315,7 @@ def _make_future(self):
return f

def _call_method(self, call: MethodCall):
self.logger.debug("[id: %s] Recieved '%s' method call", call.id, call.method_name)
self.logger.debug("[id: %s] Received '%s' method call", call.id, call.method_name)
fut = self._make_future()
self._futures.put(call.id, fut)

Expand All @@ -341,7 +339,7 @@ def _call_method(self, call: MethodCall):
return fut

def _cancel_request(self, cancel: Cancellation):
self.logger.debug("[id: %s] Recieved cancel request", cancel.id)
self.logger.debug("[id: %s] Received cancel request", cancel.id)
fut = self._futures.pop_id(cancel.id)
if fut:
fut.cancel()
Expand Down Expand Up @@ -373,3 +371,7 @@ def listen(self):
break
except Exception:
self.logger.error("Error in main loop", exc_info=1)

self._start_result_sender.join()
self._conn.close()
self.logger.debug("Closing connection")

0 comments on commit 5e75d22

Please sign in to comment.