Skip to content

Commit

Permalink
Merge pull request #32 from pollen-robotics/31-keyerror-type-when-rec…
Browse files Browse the repository at this point in the history
…eiving-a-ice-candidate

bug #31: code cherry picked from aiortc_adapter to better handle an ice candidate
  • Loading branch information
pierre-rouanet authored Nov 24, 2023
2 parents b78db91 + 7020925 commit 7dbe4e2
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
12 changes: 6 additions & 6 deletions src/gst_signalling/aiortc_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self._setup(remote_producer_peer_id)

def _setup(self, remote_producer_peer_id: Optional[str] = None) -> None:
@self.signalling.on("Welcome")
@self.signalling.on("Welcome") # type: ignore[arg-type]
def on_welcome(peer_id: str) -> None:
self.logger.info(f"Welcome received, peer_id: {peer_id}")
self.peer_id = peer_id
Expand All @@ -72,18 +72,18 @@ def on_welcome(peer_id: str) -> None:
)
self._setup_consumer(remote_producer_peer_id)

@self.signalling.on("Peer")
@self.signalling.on("Peer") # type: ignore[arg-type]
async def on_peer(session_id: str, message: dict[str, Any]) -> None:
assert self.session_id == session_id
obj = self._parse_peer_message(message)
if obj is not None:
await self.peer_msg_queue.put(obj)

@self.signalling.on("EndSession")
@self.signalling.on("EndSession") # type: ignore[arg-type]
async def on_end_session(session_id: str) -> None:
await self.peer_msg_queue.put(BYE)

@self.signalling.on("Error")
@self.signalling.on("Error") # type: ignore[arg-type]
async def on_error(details: str) -> None:
self.logger.error(f'Connection closed with error: "{details}"')
await self.peer_msg_queue.put(BYE)
Expand Down Expand Up @@ -114,7 +114,7 @@ def _parse_peer_message(
return None

def _setup_producer(self) -> None:
@self.signalling.on("StartSession")
@self.signalling.on("StartSession") # type: ignore[arg-type]
def on_start_session(peer_id: str, session_id: str) -> None:
self.logger.info(
f"StartSession received, peer_id: {peer_id}, session_id: {session_id}"
Expand All @@ -125,7 +125,7 @@ def on_start_session(peer_id: str, session_id: str) -> None:
def _setup_consumer(self, remote_producer_peer_id: str) -> None:
self.remote_producer_peer_id = remote_producer_peer_id

@self.signalling.on("SessionStarted")
@self.signalling.on("SessionStarted") # type: ignore[arg-type]
def on_session_started(peer_id: str, session_id: str) -> None:
self.logger.info(
f"SessionStarted received, peer_id: {peer_id}, session_id: {session_id}"
Expand Down
34 changes: 25 additions & 9 deletions src/gst_signalling/gst_abstract_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pyee
from typing import Any, Dict, NamedTuple, Optional

from aiortc.sdp import candidate_from_sdp


from .gst_signalling import GstSignalling

Expand Down Expand Up @@ -40,29 +42,29 @@ def __init__(

self.sessions: Dict[str, GstSession] = {}

@signalling.on("Welcome")
@signalling.on("Welcome") # type: ignore[arg-type]
def on_welcome(peer_id: str) -> None:
self.peer_id = peer_id
self.peer_id_evt.set()

@signalling.on("StartSession")
@signalling.on("StartSession") # type: ignore[arg-type]
async def on_start_session(peer_id: str, session_id: str) -> None:
self.logger.info(f"StartSession received, session_id: {session_id}")
await self.setup_session(session_id, peer_id)

@signalling.on("SessionStarted")
@signalling.on("SessionStarted") # type: ignore[arg-type]
async def on_session_started(peer_id: str, session_id: str) -> None:
self.logger.info(f"SessionStarted received, session_id: {session_id}")
await self.setup_session(session_id, peer_id)

@signalling.on("Peer")
@signalling.on("Peer") # type: ignore[arg-type]
async def on_peer(session_id: str, message: Dict[str, Dict[str, Any]]) -> None:
self.logger.info(
f"Peer received, session_id: {session_id}, message: {message}"
)
await self.peer_for_session(session_id, message)

@signalling.on("EndSession")
@signalling.on("EndSession") # type: ignore[arg-type]
async def on_end_session(session_id: str) -> None:
self.logger.info(f"EndSession received, session_id: {session_id}")
await self.close_session(session_id)
Expand Down Expand Up @@ -118,10 +120,24 @@ async def peer_for_session(
await pc.setRemoteDescription(obj)

elif "ice" in message:
obj = object_from_string(json.dumps(message["ice"]))
if isinstance(obj, RTCIceCandidate):
self.logger.info("Received ice candidate")
pc.addIceCandidate(obj)
if "type" in message and str(message["type"]) == "candidate":
obj = object_from_string(json.dumps(message["ice"]))

if isinstance(obj, RTCIceCandidate):
self.logger.info(f"Received ice candidate {obj}")
await pc.addIceCandidate(obj)

else:
message = message["ice"]
if str(message["candidate"]) == "":
self.logger.info("Received empty candidate, ignoring")
return None

obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1])
obj.sdpMLineIndex = message["sdpMLineIndex"]

self.logger.info(f"Received ice candidate {obj}")
await pc.addIceCandidate(obj)

async def close_session(self, session_id: str) -> None:
session = self.sessions.pop(session_id)
Expand Down
2 changes: 1 addition & 1 deletion src/gst_signalling/gst_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, host: str, port: int, name: str) -> None:
GstSignallingAbstractRole.__init__(self, host=host, port=port)
self.name = name

@self.signalling.on("PeerStatusChanged")
@self.signalling.on("PeerStatusChanged") # type: ignore[arg-type]
def on_peer_status_changed(
peer_id: str,
roles: List[str],
Expand Down
2 changes: 1 addition & 1 deletion src/gst_signalling/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def get_list() -> Dict[str, Dict[str, str]]:
signalling = GstSignalling(host=host, port=port)
await signalling.connect()

@signalling.on("List")
@signalling.on("List") # type: ignore[arg-type]
def on_list(found_producers: Dict[str, Dict[str, str]]) -> None:
producers.update(found_producers)
got_it.set()
Expand Down

0 comments on commit 7dbe4e2

Please sign in to comment.