From bc960ed541a09bb80563e10121b58bade003785e Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 21 Nov 2024 14:56:18 +0100 Subject: [PATCH] enhancement #53: emit end of session --- src/gst_signalling/gst_abstract_role.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index 9627de2..c93e668 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -16,6 +16,7 @@ "GstSession", [ ("peer_id", str), + ("session_id", str), ("pc", Gst.Element), # type '__gi__.GstWebRTCBin' ], ) @@ -35,6 +36,7 @@ def __init__( self.peer_id: Optional[str] = None self.peer_id_evt = asyncio.Event() + self.consume_evt = asyncio.Event() self._asyncloop = asyncio.get_event_loop() self.sessions: Dict[str, GstSession] = {} @@ -104,17 +106,19 @@ async def connect(self) -> None: async def close(self) -> None: await self.signalling.close() + self.consume_evt.set() async def consume(self) -> None: - while True: - await asyncio.sleep(1000) + # while True: + # await asyncio.sleep(1000) + await self.consume_evt.wait() # Session management async def setup_session(self, session_id: str, peer_id: str) -> GstSession: self.logger.info("setup session") pc = self.init_webrtc(session_id) - session = GstSession(peer_id, pc) + session = GstSession(peer_id, session_id, pc) self.sessions[session_id] = session @@ -130,11 +134,14 @@ def handle_ice_message(self, webrtc: Gst.Element, ice_msg: Dict[str, Any]) -> No async def close_session(self, session_id: str) -> None: self.logger.info("close session") - - session = self.sessions.pop(session_id) - self._pipeline.remove(session.pc) - session.pc.set_state(Gst.State.NULL) - self.emit("close_session", session) + try: + session = self.sessions.pop(session_id) + self._pipeline.remove(session.pc) + session.pc.set_state(Gst.State.NULL) + await self.signalling.end_session(session_id) + # self.emit("close_session", session) + except KeyError: + self.logger.warning("Session not found") async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None: await self.signalling.send_peer_message(session_id, "sdp", sdp)