From a33f20d3925c8bf91e3fed9bf7e78be4d4b06e36 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 23 Nov 2023 18:03:06 +0100 Subject: [PATCH 1/3] bug #31: code cherry picked from aiortc_adapter to better handle an ice candidate --- src/gst_signalling/gst_abstract_role.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index 0d404ea..a276fff 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -10,6 +10,8 @@ import pyee from typing import Any, Dict, NamedTuple, Optional +from aiortc.sdp import candidate_from_sdp + from .gst_signalling import GstSignalling @@ -118,10 +120,15 @@ async def peer_for_session( await pc.setRemoteDescription(obj) elif "ice" in message: - obj = object_from_string(json.dumps(message["ice"])) - if isinstance(obj, RTCIceCandidate): - self.logger.info("Received ice candidate") - pc.addIceCandidate(obj) + message = message["ice"] + if message["candidate"] == "": + self.logger.info("Received empty candidate, ignoring") + return None + + obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) + obj.sdpMLineIndex = message["sdpMLineIndex"] + + await pc.addIceCandidate(obj) async def close_session(self, session_id: str) -> None: session = self.sessions.pop(session_id) From bdaa5421d4c760787db01b3782d56958eebef094 Mon Sep 17 00:00:00 2001 From: Pierre Rouanet Date: Fri, 24 Nov 2023 12:08:47 +0100 Subject: [PATCH 2/3] Support both possibilities. --- src/gst_signalling/gst_abstract_role.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index a276fff..fe23167 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -120,15 +120,24 @@ async def peer_for_session( await pc.setRemoteDescription(obj) elif "ice" in message: - message = message["ice"] - if message["candidate"] == "": - self.logger.info("Received empty candidate, ignoring") - return None + if "type" in message and message["type"] == "candidate": + obj = object_from_string(json.dumps(message["ice"])) - obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] + if isinstance(obj, RTCIceCandidate): + self.logger.info(f"Received ice candidate {obj}") + await pc.addIceCandidate(obj) - await pc.addIceCandidate(obj) + else: + message = message["ice"] + if message["candidate"] == "": + self.logger.info("Received empty candidate, ignoring") + return None + + obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) + obj.sdpMLineIndex = message["sdpMLineIndex"] + + self.logger.info(f"Received ice candidate {obj}") + await pc.addIceCandidate(obj) async def close_session(self, session_id: str) -> None: session = self.sessions.pop(session_id) From 7020925c80630ccf53d96132e316297835dd0905 Mon Sep 17 00:00:00 2001 From: Pierre Rouanet Date: Fri, 24 Nov 2023 12:20:25 +0100 Subject: [PATCH 3/3] Mypy is just getting angrier. --- src/gst_signalling/aiortc_adapter.py | 12 ++++++------ src/gst_signalling/gst_abstract_role.py | 16 ++++++++-------- src/gst_signalling/gst_listener.py | 2 +- src/gst_signalling/utils.py | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/gst_signalling/aiortc_adapter.py b/src/gst_signalling/aiortc_adapter.py index 758eb94..6b5686b 100644 --- a/src/gst_signalling/aiortc_adapter.py +++ b/src/gst_signalling/aiortc_adapter.py @@ -57,7 +57,7 @@ def __init__( self._setup(remote_producer_peer_id) def _setup(self, remote_producer_peer_id: Optional[str] = None) -> None: - @self.signalling.on("Welcome") + @self.signalling.on("Welcome") # type: ignore[arg-type] def on_welcome(peer_id: str) -> None: self.logger.info(f"Welcome received, peer_id: {peer_id}") self.peer_id = peer_id @@ -72,18 +72,18 @@ def on_welcome(peer_id: str) -> None: ) self._setup_consumer(remote_producer_peer_id) - @self.signalling.on("Peer") + @self.signalling.on("Peer") # type: ignore[arg-type] async def on_peer(session_id: str, message: dict[str, Any]) -> None: assert self.session_id == session_id obj = self._parse_peer_message(message) if obj is not None: await self.peer_msg_queue.put(obj) - @self.signalling.on("EndSession") + @self.signalling.on("EndSession") # type: ignore[arg-type] async def on_end_session(session_id: str) -> None: await self.peer_msg_queue.put(BYE) - @self.signalling.on("Error") + @self.signalling.on("Error") # type: ignore[arg-type] async def on_error(details: str) -> None: self.logger.error(f'Connection closed with error: "{details}"') await self.peer_msg_queue.put(BYE) @@ -114,7 +114,7 @@ def _parse_peer_message( return None def _setup_producer(self) -> None: - @self.signalling.on("StartSession") + @self.signalling.on("StartSession") # type: ignore[arg-type] def on_start_session(peer_id: str, session_id: str) -> None: self.logger.info( f"StartSession received, peer_id: {peer_id}, session_id: {session_id}" @@ -125,7 +125,7 @@ def on_start_session(peer_id: str, session_id: str) -> None: def _setup_consumer(self, remote_producer_peer_id: str) -> None: self.remote_producer_peer_id = remote_producer_peer_id - @self.signalling.on("SessionStarted") + @self.signalling.on("SessionStarted") # type: ignore[arg-type] def on_session_started(peer_id: str, session_id: str) -> None: self.logger.info( f"SessionStarted received, peer_id: {peer_id}, session_id: {session_id}" diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index fe23167..bd7a53a 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -42,29 +42,29 @@ def __init__( self.sessions: Dict[str, GstSession] = {} - @signalling.on("Welcome") + @signalling.on("Welcome") # type: ignore[arg-type] def on_welcome(peer_id: str) -> None: self.peer_id = peer_id self.peer_id_evt.set() - @signalling.on("StartSession") + @signalling.on("StartSession") # type: ignore[arg-type] async def on_start_session(peer_id: str, session_id: str) -> None: self.logger.info(f"StartSession received, session_id: {session_id}") await self.setup_session(session_id, peer_id) - @signalling.on("SessionStarted") + @signalling.on("SessionStarted") # type: ignore[arg-type] async def on_session_started(peer_id: str, session_id: str) -> None: self.logger.info(f"SessionStarted received, session_id: {session_id}") await self.setup_session(session_id, peer_id) - @signalling.on("Peer") + @signalling.on("Peer") # type: ignore[arg-type] async def on_peer(session_id: str, message: Dict[str, Dict[str, Any]]) -> None: self.logger.info( f"Peer received, session_id: {session_id}, message: {message}" ) await self.peer_for_session(session_id, message) - @signalling.on("EndSession") + @signalling.on("EndSession") # type: ignore[arg-type] async def on_end_session(session_id: str) -> None: self.logger.info(f"EndSession received, session_id: {session_id}") await self.close_session(session_id) @@ -120,7 +120,7 @@ async def peer_for_session( await pc.setRemoteDescription(obj) elif "ice" in message: - if "type" in message and message["type"] == "candidate": + if "type" in message and str(message["type"]) == "candidate": obj = object_from_string(json.dumps(message["ice"])) if isinstance(obj, RTCIceCandidate): @@ -129,11 +129,11 @@ async def peer_for_session( else: message = message["ice"] - if message["candidate"] == "": + if str(message["candidate"]) == "": self.logger.info("Received empty candidate, ignoring") return None - obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) + obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1]) obj.sdpMLineIndex = message["sdpMLineIndex"] self.logger.info(f"Received ice candidate {obj}") diff --git a/src/gst_signalling/gst_listener.py b/src/gst_signalling/gst_listener.py index fa3a4f6..ac30986 100644 --- a/src/gst_signalling/gst_listener.py +++ b/src/gst_signalling/gst_listener.py @@ -9,7 +9,7 @@ def __init__(self, host: str, port: int, name: str) -> None: GstSignallingAbstractRole.__init__(self, host=host, port=port) self.name = name - @self.signalling.on("PeerStatusChanged") + @self.signalling.on("PeerStatusChanged") # type: ignore[arg-type] def on_peer_status_changed( peer_id: str, roles: List[str], diff --git a/src/gst_signalling/utils.py b/src/gst_signalling/utils.py index f296f84..b783a06 100644 --- a/src/gst_signalling/utils.py +++ b/src/gst_signalling/utils.py @@ -22,7 +22,7 @@ async def get_list() -> Dict[str, Dict[str, str]]: signalling = GstSignalling(host=host, port=port) await signalling.connect() - @signalling.on("List") + @signalling.on("List") # type: ignore[arg-type] def on_list(found_producers: Dict[str, Dict[str, str]]) -> None: producers.update(found_producers) got_it.set()