diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index ca9ceb4..e6af4ce 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -9,10 +9,10 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up Python 3.8 + - name: Set up Python 3.10 uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" cache: 'pip' # caching pip dependencies - name: Install dependencies run: | diff --git a/src/gst_signalling/aiortc_adapter.py b/src/gst_signalling/aiortc_adapter.py index d994ddd..758eb94 100644 --- a/src/gst_signalling/aiortc_adapter.py +++ b/src/gst_signalling/aiortc_adapter.py @@ -75,27 +75,9 @@ def on_welcome(peer_id: str) -> None: @self.signalling.on("Peer") async def on_peer(session_id: str, message: dict[str, Any]) -> None: assert self.session_id == session_id - - if "sdp" in message: - message = message["sdp"] - elif "ice" in message: - message = message["ice"] - else: - raise ValueError(f"Invalid message {message}") - - if "type" in message: - obj = object_from_string(json.dumps(message)) - elif "candidate" in message: - if message["candidate"] == "": - self.logger.info(f"Received empty candidate, ignoring") - return - - obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - else: - self.logger.error(f"Failed to parse message: {message}") - return - await self.peer_msg_queue.put(obj) + obj = self._parse_peer_message(message) + if obj is not None: + await self.peer_msg_queue.put(obj) @self.signalling.on("EndSession") async def on_end_session(session_id: str) -> None: @@ -106,6 +88,31 @@ async def on_error(details: str) -> None: self.logger.error(f'Connection closed with error: "{details}"') await self.peer_msg_queue.put(BYE) + def _parse_peer_message( + self, message: dict[str, Any] + ) -> Optional[Union[RTCSessionDescription, RTCIceCandidate]]: + if "sdp" in message: + message = message["sdp"] + elif "ice" in message: + message = message["ice"] + else: + raise ValueError(f"Invalid message {message}") + + if "type" in message: + obj = object_from_string(json.dumps(message)) + return obj + elif "candidate" in message: + if message["candidate"] == "": + self.logger.info("Received empty candidate, ignoring") + return None + + obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) + obj.sdpMLineIndex = message["sdpMLineIndex"] + return obj + else: + self.logger.error(f"Failed to parse message: {message}") + return None + def _setup_producer(self) -> None: @self.signalling.on("StartSession") def on_start_session(peer_id: str, session_id: str) -> None: