From 268a217e9aca88aea3ec5ea378481be8899817c9 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sun, 21 Jan 2024 19:56:51 +0100 Subject: [PATCH] enhancement #36: save draft code --- setup.cfg | 1 + .../consumer.py | 6 +- .../producer.py | 11 +- src/gst_signalling/__init__.py | 3 +- src/gst_signalling/gst_abstract_role.py | 126 ++++++++++-------- src/gst_signalling/gst_consumer.py | 75 ++++++++++- src/gst_signalling/gst_producer.py | 73 ++++++++-- src/gst_signalling/gst_signalling.py | 7 +- 8 files changed, 226 insertions(+), 76 deletions(-) diff --git a/setup.cfg b/setup.cfg index 92de1e6..39a38f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ install_requires = numpy pyee websockets + PyGObject==3.42.2 [options.packages.find] where=src diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index cb1a205..d715371 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -1,10 +1,11 @@ import argparse import asyncio import logging +import os from aiortc import RTCDataChannel -from gst_signalling import GstSession, GstSignallingConsumer +from gst_signalling import GstSignallingConsumer from gst_signalling.utils import find_producer_peer_id_by_name @@ -21,6 +22,7 @@ def main(args: argparse.Namespace) -> None: producer_peer_id=peer_id, ) + """ @consumer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc @@ -35,6 +37,7 @@ def on_message(message: str) -> None: @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: close_evt.set() + """ async def run_consumer(consumer: GstSignallingConsumer) -> None: await consumer.connect() @@ -68,5 +71,6 @@ async def run_consumer(consumer: GstSignallingConsumer) -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 6bb454d..4cfc9c7 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -1,11 +1,12 @@ import argparse import asyncio import logging +import os import time import aiortc -from gst_signalling import GstSession, GstSignallingProducer +from gst_signalling import GstSignallingProducer def main(args: argparse.Namespace) -> None: @@ -15,6 +16,9 @@ def main(args: argparse.Namespace) -> None: name=args.name, ) + freq_hz = 10 + + """ @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc @@ -32,14 +36,14 @@ async def send_pings() -> None: while True: dt = time.time() - t0 channel.send(f"ping: {dt:.1f}s") - await asyncio.sleep(1) + await asyncio.sleep(1.0 / freq_hz) except aiortc.exceptions.InvalidStateError: print("Channel closed") @channel.on("open") # type: ignore[misc] def on_open() -> None: asyncio.ensure_future(send_pings()) - + """ # run event loop loop = asyncio.get_event_loop() try: @@ -66,5 +70,6 @@ def on_open() -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/gst_signalling/__init__.py b/src/gst_signalling/__init__.py index 74b2f28..e26d838 100644 --- a/src/gst_signalling/__init__.py +++ b/src/gst_signalling/__init__.py @@ -1,5 +1,6 @@ from .aiortc_adapter import GstSignalingForAiortc # noqa: F401 -from .gst_abstract_role import GstSession # noqa: F401 + +# from .gst_abstract_role import GstSession # noqa: F401 from .gst_consumer import GstSignallingConsumer # noqa: F401 from .gst_listener import GstSignallingListener # noqa: F401 from .gst_producer import GstSignallingProducer # noqa: F401 diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index bd7a53a..86def16 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -1,26 +1,22 @@ -from aiortc import ( - RTCIceCandidate, - RTCPeerConnection, - RTCSessionDescription, -) -from aiortc.contrib.signaling import object_from_string, object_to_string import asyncio -import json import logging -import pyee from typing import Any, Dict, NamedTuple, Optional -from aiortc.sdp import candidate_from_sdp - +import gi +import pyee from .gst_signalling import GstSignalling +gi.require_version("Gst", "1.0") +gi.require_version("GstWebRTC", "1.0") + +from gi.repository import Gst GstSession = NamedTuple( "GstSession", [ ("peer_id", str), - ("pc", RTCPeerConnection), + ("pc", Gst.Element), # type '__gi__.GstWebRTCBin' ], ) @@ -39,6 +35,7 @@ def __init__( self.peer_id: Optional[str] = None self.peer_id_evt = asyncio.Event() + self._asyncloop = asyncio.get_event_loop() self.sessions: Dict[str, GstSession] = {} @@ -71,6 +68,52 @@ async def on_end_session(session_id: str) -> None: self.signalling = signalling + Gst.init(None) + + self._pipeline = Gst.Pipeline.new() + + def __del__(self) -> None: + Gst.deinit() + + def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def] + self.logger.debug(f"send sdp {type} {sdp}") + text = sdp.sdp.as_text() + msg = {"type": type, "sdp": text} + asyncio.run_coroutine_threadsafe( + self.send_sdp(session_id, msg), self._asyncloop + ) + + def send_ice_candidate_message(self, _, mlineindex, candidate): # type: ignore[no-untyped-def] + icemsg = {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}} + self.logger.debug(icemsg) + # loop = asyncio.new_event_loop() + # loop.run_until_complete(self.send_sdp(session_id, icemsg)) + + def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def] + webrtc = Gst.ElementFactory.make("webrtcbin") + assert webrtc + + # webrtc.set_property("bundle-policy", "max-bundle") + # webrtc.set_property("stun-server", "stun://stun.l.google.com:19302") + # webrtc.set_property( + # "turn-server", + # "turn://gstreamer:IsGreatWhenYouCanGetItToWork@webrtc.nirbheek.in:3478", + # ) + webrtc.set_property("stun-server", None) + webrtc.set_property("turn-server", None) + + # webrtc.connect("on-ice-candidate", lambda *args: self._ices.append(args)) + # webrtc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) + webrtc.connect("on-ice-candidate", self.send_ice_candidate_message) + # webrtc.connect( + # "notify::ice-gathering-state", self.on_ice_gathering_state_notify + # ) + + self._pipeline.set_state(Gst.State.READY) + self._pipeline.add(webrtc) + + return webrtc + async def connect(self) -> None: assert self.signalling is not None @@ -86,67 +129,32 @@ async def consume(self) -> None: # Session management async def setup_session(self, session_id: str, peer_id: str) -> GstSession: - pc = RTCPeerConnection() + self.logger.info("setup session") + pc = self.init_webrtc(session_id) session = GstSession(peer_id, pc) - self.sessions[session_id] = session - self.emit("new_session", session) + self.sessions[session_id] = session return session async def peer_for_session( - self, session_id: str, message: Dict[str, Dict[str, Any]] + self, session_id: str, message: Dict[str, Dict[str, str]] ) -> None: - session = self.sessions[session_id] - pc = session.pc - - if "sdp" in message: - obj = object_from_string(json.dumps(message["sdp"])) - - if isinstance(obj, RTCSessionDescription): - if obj.type == "offer": - self.logger.info("Received offer sdp") - await pc.setRemoteDescription(obj) - - self.logger.info("Sending answer") - await pc.setLocalDescription(await pc.createAnswer()) - - self.logger.info("Sending answer sdp") - await self.send_sdp(session_id, pc.localDescription) - - elif obj.type == "answer": - self.logger.info("Received answer sdp") - await pc.setRemoteDescription(obj) - - elif "ice" in message: - if "type" in message and str(message["type"]) == "candidate": - obj = object_from_string(json.dumps(message["ice"])) - - if isinstance(obj, RTCIceCandidate): - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) - - else: - message = message["ice"] - if str(message["candidate"]) == "": - self.logger.info("Received empty candidate, ignoring") - return None - - obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) + self.logger.info(f"peer for session {session_id} {message}") async def close_session(self, session_id: str) -> None: + self.logger.info("close session") + """ session = self.sessions.pop(session_id) self.emit("close_session", session) await session.pc.close() + """ - async def send_sdp(self, session_id: str, sdp: RTCSessionDescription) -> None: - await self.signalling.send_peer_message( - session_id, "sdp", json.loads(object_to_string(sdp)) - ) + async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None: + await self.signalling.send_peer_message(session_id, "sdp", sdp) + + async def send_ice(self, session_id: str, ice: Dict[str, Dict[str, str]]) -> None: + await self.signalling.send_peer_message(session_id, "ice", ice) diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index 8d61ad0..ba2d2a6 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -1,4 +1,17 @@ -from .gst_abstract_role import GstSignallingAbstractRole +import logging +from typing import Dict + +from gi.repository import Gst, GstSdp, GstWebRTC + +from .gst_abstract_role import GstSession, GstSignallingAbstractRole + +""" +try: + from gi.overrides import Gst as _ +except ImportError: + print("gstreamer-python binding overrides aren't available, please install them") + raise +""" class GstSignallingConsumer(GstSignallingAbstractRole): @@ -9,8 +22,68 @@ def __init__( producer_peer_id: str, ) -> None: super().__init__(host, port) + self.logger = logging.getLogger(__name__) self.producer_peer_id = producer_peer_id async def connect(self) -> None: await super().connect() await self.signalling.start_session(self.producer_peer_id) + self.logger.info("connect") + + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: + session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session consumer") + + self._pipeline.set_state(Gst.State.PLAYING) + self.emit("new_session", session) + + return session + + def on_answer_created( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + answer = reply["answer"] + promise = Gst.Promise.new() + webrtc.emit("set-local-description", answer, promise) + promise.interrupt() # we don't care about the result, discard it + # self.send_sdp(answer) + self.logger.debug(f"here {answer}") + self.make_send_sdp(answer, "answer", session_id) + + def on_offer_set( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + promise = Gst.Promise.new_with_change_func( + self.on_answer_created, webrtc, session_id + ) + webrtc.emit("create-answer", None, promise) + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "offer": + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + sdp_type = GstWebRTC.WebRTCSDPType.OFFER + offer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + promise = Gst.Promise.new_with_change_func( + self.on_offer_set, webrtc, session_id + ) + webrtc.emit("set-remote-description", offer, promise) + self.logger.debug("set remote desc done") + + elif message["sdp"]["type"] == "answer": + self.logger.warning("Consumer should not receive the answer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_producer.py b/src/gst_signalling/gst_producer.py index a37c453..69fe481 100644 --- a/src/gst_signalling/gst_producer.py +++ b/src/gst_signalling/gst_producer.py @@ -1,15 +1,16 @@ +import logging +from typing import Any, Dict + +from gi.repository import Gst, GstSdp, GstWebRTC + from .gst_abstract_role import GstSession, GstSignallingAbstractRole class GstSignallingProducer(GstSignallingAbstractRole): - def __init__( - self, - host: str, - port: int, - name: str, - ) -> None: + def __init__(self, host: str, port: int, name: str) -> None: super().__init__(host, port) self.name = name + self.logger = logging.getLogger(__name__) async def connect(self) -> None: await super().connect() @@ -19,12 +20,68 @@ async def serve4ever(self) -> None: await self.connect() await self.consume() + def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str): # type: ignore[no-untyped-def] + self.logger.debug(f"on offer created {promise} {webrtc} {session_id}") + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + offer = reply["offer"] + + promise = Gst.Promise.new() + self.logger.info("Offer created, setting local description") + webrtc.emit("set-local-description", offer, promise) + promise.interrupt() + self.make_send_sdp(offer, "offer", session_id) + + def on_negotiation_needed(self, element, session_id): # type: ignore[no-untyped-def] + self.logger.debug(f"on negociation needed {element} {session_id}") + promise = Gst.Promise.new_with_change_func( + self.on_offer_created, element, session_id + ) + element.emit("create-offer", None, promise) + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session producer") # send offer pc = session.pc - await pc.setLocalDescription(await pc.createOffer()) - await self.send_sdp(session_id, pc.localDescription) + pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) + + pc.connect("on-data-channel", self.on_data_channel) + + self._pipeline.set_state(Gst.State.PLAYING) + + self.emit("new_session", session) return session + + def on_data_channel(self, webrtc, channel): + self.logger.info("data_channel created") + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "answer": + self.logger.debug("set remote desc") + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + + sdp_type = GstWebRTC.WebRTCSDPType.ANSWER + answer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + + promise = Gst.Promise.new() + webrtc.emit("set-remote-description", answer, promise) + promise.interrupt() + self.logger.debug("set remote desc done") + elif message["sdp"]["type"] == "offer": + self.logger.warning("producer should not receive the offer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_signalling.py b/src/gst_signalling/gst_signalling.py index 3b9acc7..8fb2564 100644 --- a/src/gst_signalling/gst_signalling.py +++ b/src/gst_signalling/gst_signalling.py @@ -1,9 +1,10 @@ import asyncio import json import logging -import pyee from typing import Any, Dict, List, Optional -from websockets.legacy.client import connect, WebSocketClientProtocol + +import pyee +from websockets.legacy.client import WebSocketClientProtocol, connect class GstSignalling(pyee.AsyncIOEventEmitter): @@ -201,7 +202,7 @@ async def end_session(self, session_id: str) -> None: await self._send(message) async def send_peer_message( - self, session_id: str, type: str, peer_message: str + self, session_id: str, type: str, peer_message: Dict[str, Dict[str, str]] ) -> None: """Sends a message to a peer the sender is currently in session with.