generated from pollen-robotics/python-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gst_producer.py
78 lines (60 loc) · 3.11 KB
/
gst_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import logging
from typing import Dict
from gi.repository import Gst, GstSdp, GstWebRTC
from .gst_abstract_role import GstSession, GstSignallingAbstractRole
class GstSignallingProducer(GstSignallingAbstractRole):
def __init__(self, host: str, port: int, name: str) -> None:
super().__init__(host, port)
self.name = name
self.logger = logging.getLogger(__name__)
async def connect(self) -> None:
await super().connect()
await self.signalling.set_peer_status(roles=["producer"], name=self.name)
async def serve4ever(self) -> None:
await self.connect()
await self.consume()
def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str) -> None:
self.logger.debug(f"on offer created {promise} {webrtc} {session_id}")
assert promise.wait() == Gst.PromiseResult.REPLIED
reply = promise.get_reply()
offer = reply.get_value("offer") # type: ignore[union-attr]
promise = Gst.Promise.new()
self.logger.info("Offer created, setting local description")
webrtc.emit("set-local-description", offer, promise)
promise.interrupt()
self.make_send_sdp(offer, "offer", session_id)
def on_negotiation_needed(self, element: Gst.Element, session_id: str) -> None:
self.logger.debug(f"on negociation needed {element} {session_id}")
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, session_id)
element.emit("create-offer", None, promise)
async def setup_session(self, session_id: str, peer_id: str) -> GstSession:
session = await super().setup_session(session_id, peer_id)
self.logger.info("setup session producer")
# send offer
pc = session.pc
pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id)
pc.sync_state_with_parent()
self.emit("new_session", session)
return session
async def peer_for_session(self, session_id: str, message: Dict[str, Dict[str, str]]) -> None:
self.logger.info(f"peer for session {session_id} {message}")
session = self.sessions[session_id]
webrtc = session.pc
if "sdp" in message:
if message["sdp"]["type"] == "answer":
self.logger.debug("set remote desc")
_, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"])
sdp_type = GstWebRTC.WebRTCSDPType.ANSWER
answer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg)
promise = Gst.Promise.new()
webrtc.emit("set-remote-description", answer, promise)
promise.interrupt()
self.logger.debug("set remote desc done")
elif message["sdp"]["type"] == "offer":
self.logger.warning("producer should not receive the offer")
else:
self.logger.error(f"SDP not properly formatted {message['sdp']}")
elif "ice" in message:
self.handle_ice_message(webrtc, message["ice"])
else:
self.logger.error(f"message not processed {message}")