From bac440339417f8e83eef62df297dcc037358945e Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Fri, 5 Jan 2024 11:15:27 +0100 Subject: [PATCH 01/18] enhancement #36: make aiortc data example work --- .../consumer.py | 5 ++++- .../producer.py | 7 ++++++- src/gst_signalling/utils.py | 4 +++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index c4c9fbe..cb1a205 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -1,7 +1,9 @@ -from aiortc import RTCDataChannel import argparse import asyncio import logging + +from aiortc import RTCDataChannel + from gst_signalling import GstSession, GstSignallingConsumer from gst_signalling.utils import find_producer_peer_id_by_name @@ -28,6 +30,7 @@ def on_datachannel(channel: RTCDataChannel) -> None: @channel.on("message") # type: ignore[misc] def on_message(message: str) -> None: print("received message:", message) + channel.send("pong") @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 2798241..6bb454d 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -1,9 +1,10 @@ -import aiortc import argparse import asyncio import logging import time +import aiortc + from gst_signalling import GstSession, GstSignallingProducer @@ -20,6 +21,10 @@ def on_new_session(session: GstSession) -> None: channel = pc.createDataChannel("chat") + @channel.on("message") # type: ignore[misc] + def on_message(message: str) -> None: + print("received message:", message) + async def send_pings() -> None: try: t0 = time.time() diff --git a/src/gst_signalling/utils.py b/src/gst_signalling/utils.py index 33f34c8..f9197e9 100644 --- a/src/gst_signalling/utils.py +++ b/src/gst_signalling/utils.py @@ -34,7 +34,9 @@ def on_list(found_producers: Dict[str, Dict[str, str]]) -> None: return producers - return asyncio.run(get_list()) + # return asyncio.run(get_list()) + loop = asyncio.get_event_loop() + return loop.run_until_complete(get_list()) def find_producer_peer_id_by_name(host: str, port: int, name: str) -> str: From 268a217e9aca88aea3ec5ea378481be8899817c9 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sun, 21 Jan 2024 19:56:51 +0100 Subject: [PATCH 02/18] enhancement #36: save draft code --- setup.cfg | 1 + .../consumer.py | 6 +- .../producer.py | 11 +- src/gst_signalling/__init__.py | 3 +- src/gst_signalling/gst_abstract_role.py | 126 ++++++++++-------- src/gst_signalling/gst_consumer.py | 75 ++++++++++- src/gst_signalling/gst_producer.py | 73 ++++++++-- src/gst_signalling/gst_signalling.py | 7 +- 8 files changed, 226 insertions(+), 76 deletions(-) diff --git a/setup.cfg b/setup.cfg index 92de1e6..39a38f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ install_requires = numpy pyee websockets + PyGObject==3.42.2 [options.packages.find] where=src diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index cb1a205..d715371 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -1,10 +1,11 @@ import argparse import asyncio import logging +import os from aiortc import RTCDataChannel -from gst_signalling import GstSession, GstSignallingConsumer +from gst_signalling import GstSignallingConsumer from gst_signalling.utils import find_producer_peer_id_by_name @@ -21,6 +22,7 @@ def main(args: argparse.Namespace) -> None: producer_peer_id=peer_id, ) + """ @consumer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc @@ -35,6 +37,7 @@ def on_message(message: str) -> None: @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: close_evt.set() + """ async def run_consumer(consumer: GstSignallingConsumer) -> None: await consumer.connect() @@ -68,5 +71,6 @@ async def run_consumer(consumer: GstSignallingConsumer) -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 6bb454d..4cfc9c7 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -1,11 +1,12 @@ import argparse import asyncio import logging +import os import time import aiortc -from gst_signalling import GstSession, GstSignallingProducer +from gst_signalling import GstSignallingProducer def main(args: argparse.Namespace) -> None: @@ -15,6 +16,9 @@ def main(args: argparse.Namespace) -> None: name=args.name, ) + freq_hz = 10 + + """ @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc @@ -32,14 +36,14 @@ async def send_pings() -> None: while True: dt = time.time() - t0 channel.send(f"ping: {dt:.1f}s") - await asyncio.sleep(1) + await asyncio.sleep(1.0 / freq_hz) except aiortc.exceptions.InvalidStateError: print("Channel closed") @channel.on("open") # type: ignore[misc] def on_open() -> None: asyncio.ensure_future(send_pings()) - + """ # run event loop loop = asyncio.get_event_loop() try: @@ -66,5 +70,6 @@ def on_open() -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/gst_signalling/__init__.py b/src/gst_signalling/__init__.py index 74b2f28..e26d838 100644 --- a/src/gst_signalling/__init__.py +++ b/src/gst_signalling/__init__.py @@ -1,5 +1,6 @@ from .aiortc_adapter import GstSignalingForAiortc # noqa: F401 -from .gst_abstract_role import GstSession # noqa: F401 + +# from .gst_abstract_role import GstSession # noqa: F401 from .gst_consumer import GstSignallingConsumer # noqa: F401 from .gst_listener import GstSignallingListener # noqa: F401 from .gst_producer import GstSignallingProducer # noqa: F401 diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index bd7a53a..86def16 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -1,26 +1,22 @@ -from aiortc import ( - RTCIceCandidate, - RTCPeerConnection, - RTCSessionDescription, -) -from aiortc.contrib.signaling import object_from_string, object_to_string import asyncio -import json import logging -import pyee from typing import Any, Dict, NamedTuple, Optional -from aiortc.sdp import candidate_from_sdp - +import gi +import pyee from .gst_signalling import GstSignalling +gi.require_version("Gst", "1.0") +gi.require_version("GstWebRTC", "1.0") + +from gi.repository import Gst GstSession = NamedTuple( "GstSession", [ ("peer_id", str), - ("pc", RTCPeerConnection), + ("pc", Gst.Element), # type '__gi__.GstWebRTCBin' ], ) @@ -39,6 +35,7 @@ def __init__( self.peer_id: Optional[str] = None self.peer_id_evt = asyncio.Event() + self._asyncloop = asyncio.get_event_loop() self.sessions: Dict[str, GstSession] = {} @@ -71,6 +68,52 @@ async def on_end_session(session_id: str) -> None: self.signalling = signalling + Gst.init(None) + + self._pipeline = Gst.Pipeline.new() + + def __del__(self) -> None: + Gst.deinit() + + def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def] + self.logger.debug(f"send sdp {type} {sdp}") + text = sdp.sdp.as_text() + msg = {"type": type, "sdp": text} + asyncio.run_coroutine_threadsafe( + self.send_sdp(session_id, msg), self._asyncloop + ) + + def send_ice_candidate_message(self, _, mlineindex, candidate): # type: ignore[no-untyped-def] + icemsg = {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}} + self.logger.debug(icemsg) + # loop = asyncio.new_event_loop() + # loop.run_until_complete(self.send_sdp(session_id, icemsg)) + + def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def] + webrtc = Gst.ElementFactory.make("webrtcbin") + assert webrtc + + # webrtc.set_property("bundle-policy", "max-bundle") + # webrtc.set_property("stun-server", "stun://stun.l.google.com:19302") + # webrtc.set_property( + # "turn-server", + # "turn://gstreamer:IsGreatWhenYouCanGetItToWork@webrtc.nirbheek.in:3478", + # ) + webrtc.set_property("stun-server", None) + webrtc.set_property("turn-server", None) + + # webrtc.connect("on-ice-candidate", lambda *args: self._ices.append(args)) + # webrtc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) + webrtc.connect("on-ice-candidate", self.send_ice_candidate_message) + # webrtc.connect( + # "notify::ice-gathering-state", self.on_ice_gathering_state_notify + # ) + + self._pipeline.set_state(Gst.State.READY) + self._pipeline.add(webrtc) + + return webrtc + async def connect(self) -> None: assert self.signalling is not None @@ -86,67 +129,32 @@ async def consume(self) -> None: # Session management async def setup_session(self, session_id: str, peer_id: str) -> GstSession: - pc = RTCPeerConnection() + self.logger.info("setup session") + pc = self.init_webrtc(session_id) session = GstSession(peer_id, pc) - self.sessions[session_id] = session - self.emit("new_session", session) + self.sessions[session_id] = session return session async def peer_for_session( - self, session_id: str, message: Dict[str, Dict[str, Any]] + self, session_id: str, message: Dict[str, Dict[str, str]] ) -> None: - session = self.sessions[session_id] - pc = session.pc - - if "sdp" in message: - obj = object_from_string(json.dumps(message["sdp"])) - - if isinstance(obj, RTCSessionDescription): - if obj.type == "offer": - self.logger.info("Received offer sdp") - await pc.setRemoteDescription(obj) - - self.logger.info("Sending answer") - await pc.setLocalDescription(await pc.createAnswer()) - - self.logger.info("Sending answer sdp") - await self.send_sdp(session_id, pc.localDescription) - - elif obj.type == "answer": - self.logger.info("Received answer sdp") - await pc.setRemoteDescription(obj) - - elif "ice" in message: - if "type" in message and str(message["type"]) == "candidate": - obj = object_from_string(json.dumps(message["ice"])) - - if isinstance(obj, RTCIceCandidate): - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) - - else: - message = message["ice"] - if str(message["candidate"]) == "": - self.logger.info("Received empty candidate, ignoring") - return None - - obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) + self.logger.info(f"peer for session {session_id} {message}") async def close_session(self, session_id: str) -> None: + self.logger.info("close session") + """ session = self.sessions.pop(session_id) self.emit("close_session", session) await session.pc.close() + """ - async def send_sdp(self, session_id: str, sdp: RTCSessionDescription) -> None: - await self.signalling.send_peer_message( - session_id, "sdp", json.loads(object_to_string(sdp)) - ) + async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None: + await self.signalling.send_peer_message(session_id, "sdp", sdp) + + async def send_ice(self, session_id: str, ice: Dict[str, Dict[str, str]]) -> None: + await self.signalling.send_peer_message(session_id, "ice", ice) diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index 8d61ad0..ba2d2a6 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -1,4 +1,17 @@ -from .gst_abstract_role import GstSignallingAbstractRole +import logging +from typing import Dict + +from gi.repository import Gst, GstSdp, GstWebRTC + +from .gst_abstract_role import GstSession, GstSignallingAbstractRole + +""" +try: + from gi.overrides import Gst as _ +except ImportError: + print("gstreamer-python binding overrides aren't available, please install them") + raise +""" class GstSignallingConsumer(GstSignallingAbstractRole): @@ -9,8 +22,68 @@ def __init__( producer_peer_id: str, ) -> None: super().__init__(host, port) + self.logger = logging.getLogger(__name__) self.producer_peer_id = producer_peer_id async def connect(self) -> None: await super().connect() await self.signalling.start_session(self.producer_peer_id) + self.logger.info("connect") + + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: + session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session consumer") + + self._pipeline.set_state(Gst.State.PLAYING) + self.emit("new_session", session) + + return session + + def on_answer_created( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + answer = reply["answer"] + promise = Gst.Promise.new() + webrtc.emit("set-local-description", answer, promise) + promise.interrupt() # we don't care about the result, discard it + # self.send_sdp(answer) + self.logger.debug(f"here {answer}") + self.make_send_sdp(answer, "answer", session_id) + + def on_offer_set( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + promise = Gst.Promise.new_with_change_func( + self.on_answer_created, webrtc, session_id + ) + webrtc.emit("create-answer", None, promise) + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "offer": + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + sdp_type = GstWebRTC.WebRTCSDPType.OFFER + offer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + promise = Gst.Promise.new_with_change_func( + self.on_offer_set, webrtc, session_id + ) + webrtc.emit("set-remote-description", offer, promise) + self.logger.debug("set remote desc done") + + elif message["sdp"]["type"] == "answer": + self.logger.warning("Consumer should not receive the answer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_producer.py b/src/gst_signalling/gst_producer.py index a37c453..69fe481 100644 --- a/src/gst_signalling/gst_producer.py +++ b/src/gst_signalling/gst_producer.py @@ -1,15 +1,16 @@ +import logging +from typing import Any, Dict + +from gi.repository import Gst, GstSdp, GstWebRTC + from .gst_abstract_role import GstSession, GstSignallingAbstractRole class GstSignallingProducer(GstSignallingAbstractRole): - def __init__( - self, - host: str, - port: int, - name: str, - ) -> None: + def __init__(self, host: str, port: int, name: str) -> None: super().__init__(host, port) self.name = name + self.logger = logging.getLogger(__name__) async def connect(self) -> None: await super().connect() @@ -19,12 +20,68 @@ async def serve4ever(self) -> None: await self.connect() await self.consume() + def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str): # type: ignore[no-untyped-def] + self.logger.debug(f"on offer created {promise} {webrtc} {session_id}") + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + offer = reply["offer"] + + promise = Gst.Promise.new() + self.logger.info("Offer created, setting local description") + webrtc.emit("set-local-description", offer, promise) + promise.interrupt() + self.make_send_sdp(offer, "offer", session_id) + + def on_negotiation_needed(self, element, session_id): # type: ignore[no-untyped-def] + self.logger.debug(f"on negociation needed {element} {session_id}") + promise = Gst.Promise.new_with_change_func( + self.on_offer_created, element, session_id + ) + element.emit("create-offer", None, promise) + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session producer") # send offer pc = session.pc - await pc.setLocalDescription(await pc.createOffer()) - await self.send_sdp(session_id, pc.localDescription) + pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) + + pc.connect("on-data-channel", self.on_data_channel) + + self._pipeline.set_state(Gst.State.PLAYING) + + self.emit("new_session", session) return session + + def on_data_channel(self, webrtc, channel): + self.logger.info("data_channel created") + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "answer": + self.logger.debug("set remote desc") + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + + sdp_type = GstWebRTC.WebRTCSDPType.ANSWER + answer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + + promise = Gst.Promise.new() + webrtc.emit("set-remote-description", answer, promise) + promise.interrupt() + self.logger.debug("set remote desc done") + elif message["sdp"]["type"] == "offer": + self.logger.warning("producer should not receive the offer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_signalling.py b/src/gst_signalling/gst_signalling.py index 3b9acc7..8fb2564 100644 --- a/src/gst_signalling/gst_signalling.py +++ b/src/gst_signalling/gst_signalling.py @@ -1,9 +1,10 @@ import asyncio import json import logging -import pyee from typing import Any, Dict, List, Optional -from websockets.legacy.client import connect, WebSocketClientProtocol + +import pyee +from websockets.legacy.client import WebSocketClientProtocol, connect class GstSignalling(pyee.AsyncIOEventEmitter): @@ -201,7 +202,7 @@ async def end_session(self, session_id: str) -> None: await self._send(message) async def send_peer_message( - self, session_id: str, type: str, peer_message: str + self, session_id: str, type: str, peer_message: Dict[str, Dict[str, str]] ) -> None: """Sends a message to a peer the sender is currently in session with. From 83e83f6d1d1430a212be08b8ef6f90c2f71c060a Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sat, 27 Jan 2024 15:52:26 +0100 Subject: [PATCH 03/18] enhancement #36: working exchanging of message. draft --- setup.cfg | 1 + .../consumer.py | 19 ++++++--- .../producer.py | 39 +++++++++++++++++-- src/gst_signalling/gst_abstract_role.py | 38 ++++++++---------- src/gst_signalling/gst_consumer.py | 8 ++-- src/gst_signalling/gst_producer.py | 32 ++++++++++++--- 6 files changed, 98 insertions(+), 39 deletions(-) diff --git a/setup.cfg b/setup.cfg index 39a38f4..9d961dd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,7 @@ dev = black==23.3.0 pytest==7.3.1 coverage==7.2.5 mypy==1.0.0 + pygobject-stubs==2.10.0 [options.entry_points] console_scripts = diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index d715371..aaf6a9d 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -3,12 +3,20 @@ import logging import os -from aiortc import RTCDataChannel - from gst_signalling import GstSignallingConsumer +from gst_signalling.gst_abstract_role import GstSession from gst_signalling.utils import find_producer_peer_id_by_name +def on_data_channel_message(data_channel, data) -> None: + logging.info(f"Message from DataChannel: {data}") + data_channel.send_string("pong") + + +def on_data_channel_callback(webrtc, data_channel): + data_channel.connect("on-message-string", on_data_channel_message) + + def main(args: argparse.Namespace) -> None: peer_id = find_producer_peer_id_by_name( args.signaling_host, args.signaling_port, args.producer_name @@ -22,22 +30,23 @@ def main(args: argparse.Namespace) -> None: producer_peer_id=peer_id, ) - """ @consumer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc - + pc.connect("on-data-channel", on_data_channel_callback) + print("heeere") + """ @pc.on("datachannel") # type: ignore[misc] def on_datachannel(channel: RTCDataChannel) -> None: @channel.on("message") # type: ignore[misc] def on_message(message: str) -> None: print("received message:", message) channel.send("pong") + """ @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: close_evt.set() - """ async def run_consumer(consumer: GstSignallingConsumer) -> None: await consumer.connect() diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 4cfc9c7..a74c8b0 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -4,9 +4,14 @@ import os import time -import aiortc +from gi.repository import Gst from gst_signalling import GstSignallingProducer +from gst_signalling.gst_abstract_role import GstSession + + +def on_data_channel_message(data_channel, data) -> None: + logging.info(f"Message from DataChannel: {data}") def main(args: argparse.Namespace) -> None: @@ -16,11 +21,36 @@ def main(args: argparse.Namespace) -> None: name=args.name, ) - freq_hz = 10 + FREQ_HZ = 1000 - """ @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: + print("heeere") + + def on_open(channel: Gst.Element) -> None: + asyncio.run_coroutine_threadsafe(send_pings(channel), loop) + + async def send_pings(channel: Gst.Element) -> None: + try: + t0 = time.time() + + while True: + dt = time.time() - t0 + channel.send_string(f"ping: {dt:.1f}s") + await asyncio.sleep(1.0 / FREQ_HZ) + except Exception as e: + logging.error(f"{e}") + + pc = session.pc + data_channel = pc.emit("create-data-channel", "chat", None) + if data_channel: + # self.on_data_channel(data_channel, pc) + data_channel.connect("on-open", on_open) + data_channel.connect("on-message-string", on_data_channel_message) + else: + logging.error("Failed to create data channel") + + """ pc = session.pc channel = pc.createDataChannel("chat") @@ -43,7 +73,8 @@ async def send_pings() -> None: @channel.on("open") # type: ignore[misc] def on_open() -> None: asyncio.ensure_future(send_pings()) - """ + """ + # run event loop loop = asyncio.get_event_loop() try: diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index 86def16..b746bae 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -76,38 +76,29 @@ def __del__(self) -> None: Gst.deinit() def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def] - self.logger.debug(f"send sdp {type} {sdp}") + # self.logger.debug(f"send sdp {type} {sdp}") text = sdp.sdp.as_text() msg = {"type": type, "sdp": text} asyncio.run_coroutine_threadsafe( self.send_sdp(session_id, msg), self._asyncloop ) - def send_ice_candidate_message(self, _, mlineindex, candidate): # type: ignore[no-untyped-def] - icemsg = {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}} - self.logger.debug(icemsg) - # loop = asyncio.new_event_loop() - # loop.run_until_complete(self.send_sdp(session_id, icemsg)) + def send_ice_candidate_message(self, _, mlineindex, candidate, session_id: str): # type: ignore[no-untyped-def] + icemsg = {"candidate": candidate, "sdpMLineIndex": mlineindex} + # self.logger.debug(f"hello {icemsg} {session_id}") + asyncio.run_coroutine_threadsafe( + self.send_ice(session_id, icemsg), self._asyncloop + ) def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def] webrtc = Gst.ElementFactory.make("webrtcbin") assert webrtc - # webrtc.set_property("bundle-policy", "max-bundle") - # webrtc.set_property("stun-server", "stun://stun.l.google.com:19302") - # webrtc.set_property( - # "turn-server", - # "turn://gstreamer:IsGreatWhenYouCanGetItToWork@webrtc.nirbheek.in:3478", - # ) - webrtc.set_property("stun-server", None) - webrtc.set_property("turn-server", None) - - # webrtc.connect("on-ice-candidate", lambda *args: self._ices.append(args)) - # webrtc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) - webrtc.connect("on-ice-candidate", self.send_ice_candidate_message) - # webrtc.connect( - # "notify::ice-gathering-state", self.on_ice_gathering_state_notify - # ) + webrtc.set_property("bundle-policy", "max-bundle") + # webrtc.set_property("stun-server", None) + # webrtc.set_property("turn-server", None) + + webrtc.connect("on-ice-candidate", self.send_ice_candidate_message, session_id) self._pipeline.set_state(Gst.State.READY) self._pipeline.add(webrtc) @@ -143,6 +134,11 @@ async def peer_for_session( ) -> None: self.logger.info(f"peer for session {session_id} {message}") + def handle_ice_message(self, webrtc, ice_msg) -> None: # type: ignore[no-untyped-def] + candidate = ice_msg["candidate"] + sdpmlineindex = ice_msg["sdpMLineIndex"] + webrtc.emit("add-ice-candidate", sdpmlineindex, candidate) + async def close_session(self, session_id: str) -> None: self.logger.info("close session") """ diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index ba2d2a6..f427504 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -44,12 +44,11 @@ def on_answer_created( ) -> None: assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() - answer = reply["answer"] + # answer = reply["answer"] + answer = reply.get_value("answer") promise = Gst.Promise.new() webrtc.emit("set-local-description", answer, promise) promise.interrupt() # we don't care about the result, discard it - # self.send_sdp(answer) - self.logger.debug(f"here {answer}") self.make_send_sdp(answer, "answer", session_id) def on_offer_set( @@ -85,5 +84,8 @@ async def peer_for_session( else: self.logger.error(f"SDP not properly formatted {message['sdp']}") + elif "ice" in message: + self.handle_ice_message(webrtc, message["ice"]) + else: self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_producer.py b/src/gst_signalling/gst_producer.py index 69fe481..8dbc04d 100644 --- a/src/gst_signalling/gst_producer.py +++ b/src/gst_signalling/gst_producer.py @@ -24,7 +24,8 @@ def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id self.logger.debug(f"on offer created {promise} {webrtc} {session_id}") assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() - offer = reply["offer"] + # offer = reply["offer"] + offer = reply.get_value("offer") promise = Gst.Promise.new() self.logger.info("Offer created, setting local description") @@ -47,16 +48,34 @@ async def setup_session(self, session_id: str, peer_id: str) -> GstSession: pc = session.pc pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) - pc.connect("on-data-channel", self.on_data_channel) - self._pipeline.set_state(Gst.State.PLAYING) + """ + data_channel = pc.emit("create-data-channel", "myLabel", None) + if data_channel: + self.on_data_channel(data_channel, pc) + else: + self.logger.error("Failed to create data channel") + + """ self.emit("new_session", session) return session - def on_data_channel(self, webrtc, channel): - self.logger.info("data_channel created") + # Fonction de callback pour la gestion des messages du canal de données + def on_data_channel_message(self, data_channel, data, length, user_data) -> None: + self.logger.info(f"Message from DataChannel: {data}") + + # Fonction de callback pour la gestion de l'état du canal de données + def on_data_channel_state_change(self, data_channel, user_data) -> None: + state = data_channel.get_state() + self.logger.info(f"DataChannel State Changed: {state}") + + # Fonction de callback pour la création du canal de données + def on_data_channel(self, data_channel, webrtc) -> None: + self.logger.info("DataChannel created") + data_channel.connect("on-message-string", self.on_data_channel_message) + # data_channel.connect("on-state-change", self.on_data_channel_state_change) async def peer_for_session( self, session_id: str, message: Dict[str, Dict[str, str]] @@ -82,6 +101,7 @@ async def peer_for_session( self.logger.warning("producer should not receive the offer") else: self.logger.error(f"SDP not properly formatted {message['sdp']}") - + elif "ice" in message: + self.handle_ice_message(webrtc, message["ice"]) else: self.logger.error(f"message not processed {message}") From 468495c1c7d0054b5124cc2b402321c126a3a48b Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sat, 27 Jan 2024 19:17:01 +0100 Subject: [PATCH 04/18] enhancement #36: clean up --- setup.cfg | 2 + src/example/datachannel-cli/README.rst | 29 --- src/example/datachannel-cli/cli.py | 123 --------- .../README.md | 4 +- .../consumer.py | 15 +- .../producer.py | 32 +-- src/example/videostream-cli/README.rst | 53 ---- src/example/videostream-cli/cli.py | 168 ------------ src/gst_signalling/__init__.py | 2 - src/gst_signalling/aiortc_adapter.py | 246 ------------------ src/gst_signalling/gst_abstract_role.py | 16 +- src/gst_signalling/gst_consumer.py | 10 +- src/gst_signalling/gst_producer.py | 34 +-- src/gst_signalling/gst_signalling.py | 2 +- tests/test_import.py | 1 - 15 files changed, 28 insertions(+), 709 deletions(-) delete mode 100644 src/example/datachannel-cli/README.rst delete mode 100644 src/example/datachannel-cli/cli.py delete mode 100644 src/example/videostream-cli/README.rst delete mode 100644 src/example/videostream-cli/cli.py delete mode 100644 src/gst_signalling/aiortc_adapter.py diff --git a/setup.cfg b/setup.cfg index 9d961dd..605ce68 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,8 @@ exclude = tests,src/example/videostream-cli max-line-length = 128 extend-ignore = E203 max-complexity = 10 +per-file-ignores = + src/gst_signalling/gst_abstract_role.py:E402 [coverage:run] branch=True diff --git a/src/example/datachannel-cli/README.rst b/src/example/datachannel-cli/README.rst deleted file mode 100644 index 93797c3..0000000 --- a/src/example/datachannel-cli/README.rst +++ /dev/null @@ -1,29 +0,0 @@ -Data channel CLI -================ - -This example illustrates the establishment of a data channel using an -RTCPeerConnection and the gstreamer signaling channel to exchange SDP. - -First, run the gtreamer signaling server (see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc for details). - -.. code-block:: console - - $ WEBRTCSINK_SIGNALLING_SERVER_LOG=info gst-webrtc-signalling-server - - -To run the example, you will need instances of the `cli` example: - -- The first takes on the role of the producer and sets its name to `datachannel-cli-producer`. - -.. code-block:: console - - $ python cli.py producer --name datachannel-cli-producer - -- The second takes on the role of the consumer and uses the producer name to find it. - -.. code-block:: console - - $ python cli.py consumer --remote-producer-peer-name datachannel-cli-producer - - -You can also use peer_id instead of peer_name. diff --git a/src/example/datachannel-cli/cli.py b/src/example/datachannel-cli/cli.py deleted file mode 100644 index 5abbdeb..0000000 --- a/src/example/datachannel-cli/cli.py +++ /dev/null @@ -1,123 +0,0 @@ -import argparse -import asyncio -import logging -import time - -from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription -from gst_signalling.aiortc_adapter import BYE, add_signaling_arguments, create_signaling - - -def channel_log(channel, t, message): - print("channel(%s) %s %s" % (channel.label, t, message)) - - -def channel_send(channel, message): - channel_log(channel, ">", message) - channel.send(message) - - -async def consume_signaling(pc, signaling): - while True: - obj = await signaling.receive() - - if isinstance(obj, RTCSessionDescription): - await pc.setRemoteDescription(obj) - - if obj.type == "offer": - # send answer - await pc.setLocalDescription(await pc.createAnswer()) - await signaling.send(pc.localDescription) - elif isinstance(obj, RTCIceCandidate): - await pc.addIceCandidate(obj) - elif obj is BYE: - print("Exiting") - break - - -time_start = None - - -def current_stamp(): - global time_start - - if time_start is None: - time_start = time.time() - return 0 - else: - return int((time.time() - time_start) * 1000000) - - -async def run_answer(pc, signaling): - await signaling.connect() - - @pc.on("datachannel") - def on_datachannel(channel): - channel_log(channel, "-", "created by remote party") - - @channel.on("message") - def on_message(message): - channel_log(channel, "<", message) - - if isinstance(message, str) and message.startswith("ping"): - # reply - channel_send(channel, "pong" + message[4:]) - - await consume_signaling(pc, signaling) - - -async def run_offer(pc, signaling): - await signaling.connect() - - channel = pc.createDataChannel("chat") - channel_log(channel, "-", "created by local party") - - async def send_pings(): - while True: - channel_send(channel, "ping %d" % current_stamp()) - await asyncio.sleep(1) - - @channel.on("open") - def on_open(): - asyncio.ensure_future(send_pings()) - - @channel.on("message") - def on_message(message): - channel_log(channel, "<", message) - - if isinstance(message, str) and message.startswith("pong"): - elapsed_ms = (current_stamp() - int(message[5:])) / 1000 - print(" RTT %.2f ms" % elapsed_ms) - - # send offer - await pc.setLocalDescription(await pc.createOffer()) - await signaling.send(pc.localDescription) - - await consume_signaling(pc, signaling) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Data channels ping/pong") - parser.add_argument("--verbose", "-v", action="count") - add_signaling_arguments(parser) - - args = parser.parse_args() - - if args.verbose: - logging.basicConfig(level=logging.INFO) - - signaling = create_signaling(args) - pc = RTCPeerConnection() - if args.role == "producer": - coro = run_offer(pc, signaling) - else: - coro = run_answer(pc, signaling) - - # run event loop - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(coro) - except KeyboardInterrupt: - pass - finally: - loop.run_until_complete(pc.close()) - loop.run_until_complete(signaling.close()) diff --git a/src/example/datachannel-single-producer-multiple-consumer/README.md b/src/example/datachannel-single-producer-multiple-consumer/README.md index eeba226..4ebd69b 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/README.md +++ b/src/example/datachannel-single-producer-multiple-consumer/README.md @@ -7,11 +7,11 @@ This example shows how to use a single producer and multiple consumers. * Starts the producer: ```bash -python producer.py +python producer.py [-vv] ``` * Starts any number of consumers: ```bash -python consumer.py +python consumer.py [-vv] ``` \ No newline at end of file diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index aaf6a9d..74058c4 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -3,17 +3,19 @@ import logging import os +from gi.repository import Gst + from gst_signalling import GstSignallingConsumer from gst_signalling.gst_abstract_role import GstSession from gst_signalling.utils import find_producer_peer_id_by_name -def on_data_channel_message(data_channel, data) -> None: +def on_data_channel_message(data_channel, data: str) -> None: # type: ignore[no-untyped-def] logging.info(f"Message from DataChannel: {data}") data_channel.send_string("pong") -def on_data_channel_callback(webrtc, data_channel): +def on_data_channel_callback(webrtc: Gst.Element, data_channel) -> None: # type: ignore[no-untyped-def] data_channel.connect("on-message-string", on_data_channel_message) @@ -34,15 +36,6 @@ def main(args: argparse.Namespace) -> None: def on_new_session(session: GstSession) -> None: pc = session.pc pc.connect("on-data-channel", on_data_channel_callback) - print("heeere") - """ - @pc.on("datachannel") # type: ignore[misc] - def on_datachannel(channel: RTCDataChannel) -> None: - @channel.on("message") # type: ignore[misc] - def on_message(message: str) -> None: - print("received message:", message) - channel.send("pong") - """ @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index a74c8b0..dfc22ad 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -10,7 +10,7 @@ from gst_signalling.gst_abstract_role import GstSession -def on_data_channel_message(data_channel, data) -> None: +def on_data_channel_message(data_channel, data: str) -> None: # type: ignore[no-untyped-def] logging.info(f"Message from DataChannel: {data}") @@ -21,7 +21,7 @@ def main(args: argparse.Namespace) -> None: name=args.name, ) - FREQ_HZ = 1000 + FREQ_HZ = 100 @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: @@ -36,7 +36,7 @@ async def send_pings(channel: Gst.Element) -> None: while True: dt = time.time() - t0 - channel.send_string(f"ping: {dt:.1f}s") + channel.send_string(f"ping: {dt:.1f}s") # type: ignore[attr-defined] await asyncio.sleep(1.0 / FREQ_HZ) except Exception as e: logging.error(f"{e}") @@ -44,37 +44,11 @@ async def send_pings(channel: Gst.Element) -> None: pc = session.pc data_channel = pc.emit("create-data-channel", "chat", None) if data_channel: - # self.on_data_channel(data_channel, pc) data_channel.connect("on-open", on_open) data_channel.connect("on-message-string", on_data_channel_message) else: logging.error("Failed to create data channel") - """ - pc = session.pc - - channel = pc.createDataChannel("chat") - - @channel.on("message") # type: ignore[misc] - def on_message(message: str) -> None: - print("received message:", message) - - async def send_pings() -> None: - try: - t0 = time.time() - - while True: - dt = time.time() - t0 - channel.send(f"ping: {dt:.1f}s") - await asyncio.sleep(1.0 / freq_hz) - except aiortc.exceptions.InvalidStateError: - print("Channel closed") - - @channel.on("open") # type: ignore[misc] - def on_open() -> None: - asyncio.ensure_future(send_pings()) - """ - # run event loop loop = asyncio.get_event_loop() try: diff --git a/src/example/videostream-cli/README.rst b/src/example/videostream-cli/README.rst deleted file mode 100644 index 9fbdccf..0000000 --- a/src/example/videostream-cli/README.rst +++ /dev/null @@ -1,53 +0,0 @@ -Video channel CLI -================= - -This example illustrates the establishment of a video stream using an -RTCPeerConnection. - -It uses the gstreamer signaling mecanisms. - -By default the sent video is an animated French flag, but it is also possible -to use a MediaPlayer to read media from a file. - -This example also illustrates how to use a MediaRecorder to capture media to a -file. - -Running the example -------------------- - -First, run the gtreamer signaling server (see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc for details). - -.. code-block:: console - - $ WEBRTCSINK_SIGNALLING_SERVER_LOG=info gst-webrtc-signalling-server - -To run the example, you will need instances of the `cli` example: - -- The first takes on the role of the producer and sets its name to - `videostream-cli-producer`. - -.. code-block:: console - - $ python cli.py producer --name videostream-cli-producer - -- The second takes on the role of the consumer. - -.. code-block:: console - - $ python cli.py consumer --remote-producer-peer-name videostream-cli-producer - -Additional options ------------------- - -If you want to play a media file instead of sending the example image, run: - -.. code-block:: console - - $ python cli.py --play-from video.mp4 - -If you want to recording the received video you can run one of the following: - -.. code-block:: console - - $ python cli.py answer --record-to video.mp4 - $ python cli.py answer --record-to video-%3d.png diff --git a/src/example/videostream-cli/cli.py b/src/example/videostream-cli/cli.py deleted file mode 100644 index f6b4477..0000000 --- a/src/example/videostream-cli/cli.py +++ /dev/null @@ -1,168 +0,0 @@ -import argparse -import asyncio -import logging -import math - -import cv2 -import numpy -from aiortc import ( - RTCIceCandidate, - RTCPeerConnection, - RTCSessionDescription, - VideoStreamTrack, -) -from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder -from gst_signalling.aiortc_adapter import BYE, add_signaling_arguments, create_signaling -from av import VideoFrame - - -class FlagVideoStreamTrack(VideoStreamTrack): - """ - A video track that returns an animated flag. - """ - - def __init__(self): - super().__init__() # don't forget this! - self.counter = 0 - height, width = 480, 640 - - # generate flag - data_bgr = numpy.hstack( - [ - self._create_rectangle( - width=213, height=480, color=(255, 0, 0) - ), # blue - self._create_rectangle( - width=214, height=480, color=(255, 255, 255) - ), # white - self._create_rectangle(width=213, height=480, color=(0, 0, 255)), # red - ] - ) - - # shrink and center it - M = numpy.float32([[0.5, 0, width / 4], [0, 0.5, height / 4]]) - data_bgr = cv2.warpAffine(data_bgr, M, (width, height)) - - # compute animation - omega = 2 * math.pi / height - id_x = numpy.tile(numpy.array(range(width), dtype=numpy.float32), (height, 1)) - id_y = numpy.tile( - numpy.array(range(height), dtype=numpy.float32), (width, 1) - ).transpose() - - self.frames = [] - for k in range(30): - phase = 2 * k * math.pi / 30 - map_x = id_x + 10 * numpy.cos(omega * id_x + phase) - map_y = id_y + 10 * numpy.sin(omega * id_x + phase) - self.frames.append( - VideoFrame.from_ndarray( - cv2.remap(data_bgr, map_x, map_y, cv2.INTER_LINEAR), format="bgr24" - ) - ) - - async def recv(self): - pts, time_base = await self.next_timestamp() - - frame = self.frames[self.counter % 30] - frame.pts = pts - frame.time_base = time_base - self.counter += 1 - return frame - - def _create_rectangle(self, width, height, color): - data_bgr = numpy.zeros((height, width, 3), numpy.uint8) - data_bgr[:, :] = color - return data_bgr - - -async def run(pc, player, recorder, signaling, role): - def add_tracks(): - if player and player.audio: - pc.addTrack(player.audio) - - if player and player.video: - pc.addTrack(player.video) - else: - pc.addTrack(FlagVideoStreamTrack()) - - @pc.on("track") - def on_track(track): - print("Receiving %s" % track.kind) - recorder.addTrack(track) - - # connect signaling - await signaling.connect() - - if role == "producer": - # send offer - add_tracks() - await pc.setLocalDescription(await pc.createOffer()) - await signaling.send(pc.localDescription) - - # consume signaling - while True: - obj = await signaling.receive() - - if isinstance(obj, RTCSessionDescription): - await pc.setRemoteDescription(obj) - await recorder.start() - - if obj.type == "offer": - # send answer - add_tracks() - await pc.setLocalDescription(await pc.createAnswer()) - await signaling.send(pc.localDescription) - elif isinstance(obj, RTCIceCandidate): - await pc.addIceCandidate(obj) - elif obj is BYE: - print("Exiting") - break - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Video stream from the command line") - parser.add_argument("--play-from", help="Read the media from a file and sent it."), - parser.add_argument("--record-to", help="Write received media to a file."), - parser.add_argument("--verbose", "-v", action="count") - add_signaling_arguments(parser) - args = parser.parse_args() - - if args.verbose: - logging.basicConfig(level=logging.INFO) - - # create signaling and peer connection - signaling = create_signaling(args) - pc = RTCPeerConnection() - - # create media source - if args.play_from: - player = MediaPlayer(args.play_from) - else: - player = None - - # create media sink - if args.record_to: - recorder = MediaRecorder(args.record_to) - else: - recorder = MediaBlackhole() - - # run event loop - loop = asyncio.get_event_loop() - try: - loop.run_until_complete( - run( - pc=pc, - player=player, - recorder=recorder, - signaling=signaling, - role=args.role, - ) - ) - except KeyboardInterrupt: - pass - finally: - # cleanup - loop.run_until_complete(recorder.stop()) - loop.run_until_complete(signaling.close()) - loop.run_until_complete(pc.close()) diff --git a/src/gst_signalling/__init__.py b/src/gst_signalling/__init__.py index e26d838..3851f3f 100644 --- a/src/gst_signalling/__init__.py +++ b/src/gst_signalling/__init__.py @@ -1,5 +1,3 @@ -from .aiortc_adapter import GstSignalingForAiortc # noqa: F401 - # from .gst_abstract_role import GstSession # noqa: F401 from .gst_consumer import GstSignallingConsumer # noqa: F401 from .gst_listener import GstSignallingListener # noqa: F401 diff --git a/src/gst_signalling/aiortc_adapter.py b/src/gst_signalling/aiortc_adapter.py deleted file mode 100644 index 6b5686b..0000000 --- a/src/gst_signalling/aiortc_adapter.py +++ /dev/null @@ -1,246 +0,0 @@ -from aiortc import RTCIceCandidate, RTCSessionDescription -from aiortc.contrib.signaling import object_from_string, object_to_string -from aiortc.sdp import candidate_from_sdp -import argparse -import asyncio -import json -import logging -from typing import Any, Optional, Union - -from .gst_signalling import GstSignalling -from .utils import find_producer_peer_id_by_name - - -class GstSignalingForAiortc: - """Gstreamer signalling for aiortc. - - This class is a wrapper around GstSignalling that provides a simple interface, - that should be mostly compatible with aiortc examples. - """ - - def __init__( - self, - signaling_host: str, - signaling_port: int, - role: str, - name: str, - remote_producer_peer_id: Optional[str] = None, - ): - """Initializes the signalling peer. - - Args: - signaling_host (str): Hostname of the signalling server. - signaling_port (int): Port of the signalling server. - role (str): Signalling role (consumer or producer). - name (str): Peer name. - remote_producer_peer_id (Optional[str], optional): Producer peer_id (required in consumer role!). - """ - self.logger = logging.getLogger(__name__) - self.signalling = GstSignalling(signaling_host, signaling_port) - - if role not in ("consumer", "producer"): - raise ValueError("invalid role {role}") - self.role = role - - self.name = name - - self.peer_id: Optional[str] = None - self.peer_id_evt = asyncio.Event() - - self.session_id: Optional[str] = None - self.session_id_evt = asyncio.Event() - - self.peer_msg_queue: asyncio.Queue[ - Union[RTCSessionDescription, RTCIceCandidate, object] - ] = asyncio.Queue() - - self._setup(remote_producer_peer_id) - - def _setup(self, remote_producer_peer_id: Optional[str] = None) -> None: - @self.signalling.on("Welcome") # type: ignore[arg-type] - def on_welcome(peer_id: str) -> None: - self.logger.info(f"Welcome received, peer_id: {peer_id}") - self.peer_id = peer_id - self.peer_id_evt.set() - - if self.role == "producer": - self._setup_producer() - elif self.role == "consumer": - if remote_producer_peer_id is None: - raise ValueError( - "In consumer role, you have to specify the remote-producer-peer-id!" - ) - self._setup_consumer(remote_producer_peer_id) - - @self.signalling.on("Peer") # type: ignore[arg-type] - async def on_peer(session_id: str, message: dict[str, Any]) -> None: - assert self.session_id == session_id - obj = self._parse_peer_message(message) - if obj is not None: - await self.peer_msg_queue.put(obj) - - @self.signalling.on("EndSession") # type: ignore[arg-type] - async def on_end_session(session_id: str) -> None: - await self.peer_msg_queue.put(BYE) - - @self.signalling.on("Error") # type: ignore[arg-type] - async def on_error(details: str) -> None: - self.logger.error(f'Connection closed with error: "{details}"') - await self.peer_msg_queue.put(BYE) - - def _parse_peer_message( - self, message: dict[str, Any] - ) -> Optional[Union[RTCSessionDescription, RTCIceCandidate]]: - if "sdp" in message: - message = message["sdp"] - elif "ice" in message: - message = message["ice"] - else: - raise ValueError(f"Invalid message {message}") - - if "type" in message: - obj = object_from_string(json.dumps(message)) - return obj - elif "candidate" in message: - if message["candidate"] == "": - self.logger.info("Received empty candidate, ignoring") - return None - - obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - return obj - else: - self.logger.error(f"Failed to parse message: {message}") - return None - - def _setup_producer(self) -> None: - @self.signalling.on("StartSession") # type: ignore[arg-type] - def on_start_session(peer_id: str, session_id: str) -> None: - self.logger.info( - f"StartSession received, peer_id: {peer_id}, session_id: {session_id}" - ) - self.session_id = session_id - self.session_id_evt.set() - - def _setup_consumer(self, remote_producer_peer_id: str) -> None: - self.remote_producer_peer_id = remote_producer_peer_id - - @self.signalling.on("SessionStarted") # type: ignore[arg-type] - def on_session_started(peer_id: str, session_id: str) -> None: - self.logger.info( - f"SessionStarted received, peer_id: {peer_id}, session_id: {session_id}" - ) - self.session_id = session_id - self.session_id_evt.set() - - async def connect(self) -> None: - """Connects to the signalling server. - - This method will block until the connection is established (meaning we received a PeerID and a SessionID). - This method has to be called before any other method. - """ - await self.signalling.connect() - await self.peer_id_evt.wait() - - if self.role == "producer": - await self.signalling.set_peer_status(roles=["producer"], name=self.name) - - elif self.role == "consumer": - await self.signalling.start_session(peer_id=self.remote_producer_peer_id) - - await self.session_id_evt.wait() - - self.logger.info( - f"Connected, peer_id: {self.peer_id}, session_id: {self.session_id}" - ) - - async def close(self) -> None: - """Closes the connection to the signalling server.""" - await self.signalling.close() - - async def send( - self, message: Union[RTCIceCandidate, RTCSessionDescription] - ) -> None: - """Sends a message to the other peer (SDP or ICECandidate). - - Args: - message (Union[RTCIceCandidate, RTCSessionDescription]): Message to send. - """ - data = json.loads(object_to_string(message)) - assert self.session_id is not None - - if isinstance(message, RTCSessionDescription): - await self.signalling.send_peer_message(self.session_id, "sdp", data) - elif isinstance(message, RTCIceCandidate): - await self.signalling.send_peer_message(self.session_id, "ice", data) - else: - raise ValueError(f"Invalid message type {type(message)}") - - async def receive(self) -> Union[RTCIceCandidate, RTCSessionDescription]: - """Receives a message from the other peer (SDP or ICECandidate).""" - obj = await self.peer_msg_queue.get() - self.logger.info(f"Received peer message: {obj}") - return obj - - -BYE = object() - - -def create_signaling(args: argparse.Namespace) -> GstSignalingForAiortc: - """Creates a GstSignalingForAiortc instance from command line arguments.""" - if args.role == "consumer": - if ( - args.remote_producer_peer_id is None - and args.remote_producer_peer_name is None - ): - raise ValueError( - "In consumer role, you have to specify the remote-producer-peer-id or remote-producer-peer-name!" - ) - elif args.remote_producer_peer_id is not None: - remote_producer_peer_id = args.remote_producer_peer_id - else: - remote_producer_peer_id = find_producer_peer_id_by_name( - host=args.signaling_host, - port=args.signaling_port, - name=args.remote_producer_peer_name, - ) - else: - remote_producer_peer_id = None - - return GstSignalingForAiortc( - signaling_host=args.signaling_host, - signaling_port=args.signaling_port, - role=args.role, - name=args.name, - remote_producer_peer_id=remote_producer_peer_id, - ) - - -def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: - """Adds command line arguments for GstSignalingForAiortc. - - * signaling-host: Hostname of the signalling server. - * signaling-port: Port of the signalling server. - * role: Signalling role (consumer or producer). - * name: Peer name. - * remote-producer-peer-id: Producer peer_id (required in consumer role!). - """ - parser.add_argument( - "--signaling-host", default="127.0.0.1", help="Gstreamer signaling host" - ) - parser.add_argument( - "--signaling-port", default=8443, help="Gstreamer signaling port" - ) - parser.add_argument("role", choices=["consumer", "producer"], help="Signaling role") - parser.add_argument("--name", default="aiortc-peer", help="peer name") - parser.add_argument( - "--remote-producer-peer-id", - type=str, - help="producer peer_id (in consumer role, either set this or remote-producer-peer-name!)", - ) - parser.add_argument( - "--remote-producer-peer-name", - type=str, - default="aiortc-peer", - help="producer peer_name (in consumer role, either set this or remote-producer-peer-id!)", - ) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index b746bae..ee68614 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -75,22 +75,24 @@ async def on_end_session(session_id: str) -> None: def __del__(self) -> None: Gst.deinit() - def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def] - # self.logger.debug(f"send sdp {type} {sdp}") + def make_send_sdp( + self, sdp: Any, type: str, session_id: str + ) -> None: # sdp is GstWebRTC.WebRTCSessionDescription text = sdp.sdp.as_text() msg = {"type": type, "sdp": text} asyncio.run_coroutine_threadsafe( self.send_sdp(session_id, msg), self._asyncloop ) - def send_ice_candidate_message(self, _, mlineindex, candidate, session_id: str): # type: ignore[no-untyped-def] + def send_ice_candidate_message( + self, _: Gst.Element, mlineindex: int, candidate: str, session_id: str + ) -> None: icemsg = {"candidate": candidate, "sdpMLineIndex": mlineindex} - # self.logger.debug(f"hello {icemsg} {session_id}") asyncio.run_coroutine_threadsafe( self.send_ice(session_id, icemsg), self._asyncloop ) - def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def] + def init_webrtc(self, session_id: str) -> Gst.Element: webrtc = Gst.ElementFactory.make("webrtcbin") assert webrtc @@ -134,7 +136,7 @@ async def peer_for_session( ) -> None: self.logger.info(f"peer for session {session_id} {message}") - def handle_ice_message(self, webrtc, ice_msg) -> None: # type: ignore[no-untyped-def] + def handle_ice_message(self, webrtc: Gst.Element, ice_msg: Dict[str, Any]) -> None: candidate = ice_msg["candidate"] sdpmlineindex = ice_msg["sdpMLineIndex"] webrtc.emit("add-ice-candidate", sdpmlineindex, candidate) @@ -152,5 +154,5 @@ async def close_session(self, session_id: str) -> None: async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None: await self.signalling.send_peer_message(session_id, "sdp", sdp) - async def send_ice(self, session_id: str, ice: Dict[str, Dict[str, str]]) -> None: + async def send_ice(self, session_id: str, ice: Dict[str, Any]) -> None: await self.signalling.send_peer_message(session_id, "ice", ice) diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index f427504..cdbc9b8 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -5,14 +5,6 @@ from .gst_abstract_role import GstSession, GstSignallingAbstractRole -""" -try: - from gi.overrides import Gst as _ -except ImportError: - print("gstreamer-python binding overrides aren't available, please install them") - raise -""" - class GstSignallingConsumer(GstSignallingAbstractRole): def __init__( @@ -45,7 +37,7 @@ def on_answer_created( assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() # answer = reply["answer"] - answer = reply.get_value("answer") + answer = reply.get_value("answer") # type: ignore[union-attr] promise = Gst.Promise.new() webrtc.emit("set-local-description", answer, promise) promise.interrupt() # we don't care about the result, discard it diff --git a/src/gst_signalling/gst_producer.py b/src/gst_signalling/gst_producer.py index 8dbc04d..07f9e31 100644 --- a/src/gst_signalling/gst_producer.py +++ b/src/gst_signalling/gst_producer.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Dict +from typing import Dict from gi.repository import Gst, GstSdp, GstWebRTC @@ -20,12 +20,13 @@ async def serve4ever(self) -> None: await self.connect() await self.consume() - def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str): # type: ignore[no-untyped-def] + def on_offer_created( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: self.logger.debug(f"on offer created {promise} {webrtc} {session_id}") assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() - # offer = reply["offer"] - offer = reply.get_value("offer") + offer = reply.get_value("offer") # type: ignore[union-attr] promise = Gst.Promise.new() self.logger.info("Offer created, setting local description") @@ -33,7 +34,7 @@ def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id promise.interrupt() self.make_send_sdp(offer, "offer", session_id) - def on_negotiation_needed(self, element, session_id): # type: ignore[no-untyped-def] + def on_negotiation_needed(self, element: Gst.Element, session_id: str) -> None: self.logger.debug(f"on negociation needed {element} {session_id}") promise = Gst.Promise.new_with_change_func( self.on_offer_created, element, session_id @@ -50,33 +51,10 @@ async def setup_session(self, session_id: str, peer_id: str) -> GstSession: self._pipeline.set_state(Gst.State.PLAYING) - """ - data_channel = pc.emit("create-data-channel", "myLabel", None) - if data_channel: - self.on_data_channel(data_channel, pc) - else: - self.logger.error("Failed to create data channel") - - """ self.emit("new_session", session) return session - # Fonction de callback pour la gestion des messages du canal de données - def on_data_channel_message(self, data_channel, data, length, user_data) -> None: - self.logger.info(f"Message from DataChannel: {data}") - - # Fonction de callback pour la gestion de l'état du canal de données - def on_data_channel_state_change(self, data_channel, user_data) -> None: - state = data_channel.get_state() - self.logger.info(f"DataChannel State Changed: {state}") - - # Fonction de callback pour la création du canal de données - def on_data_channel(self, data_channel, webrtc) -> None: - self.logger.info("DataChannel created") - data_channel.connect("on-message-string", self.on_data_channel_message) - # data_channel.connect("on-state-change", self.on_data_channel_state_change) - async def peer_for_session( self, session_id: str, message: Dict[str, Dict[str, str]] ) -> None: diff --git a/src/gst_signalling/gst_signalling.py b/src/gst_signalling/gst_signalling.py index 8fb2564..fcb05b6 100644 --- a/src/gst_signalling/gst_signalling.py +++ b/src/gst_signalling/gst_signalling.py @@ -202,7 +202,7 @@ async def end_session(self, session_id: str) -> None: await self._send(message) async def send_peer_message( - self, session_id: str, type: str, peer_message: Dict[str, Dict[str, str]] + self, session_id: str, type: str, peer_message: Dict[str, Any] ) -> None: """Sends a message to a peer the sender is currently in session with. diff --git a/tests/test_import.py b/tests/test_import.py index df6ef81..4a3ed5b 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -1,3 +1,2 @@ def test_import(): from gst_signalling.gst_signalling import GstSignalling - from gst_signalling.aiortc_adapter import GstSignalingForAiortc From 4d0ec68f57e0f72cde8977b6a8dbca5c1ef9d4fe Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sat, 27 Jan 2024 19:20:15 +0100 Subject: [PATCH 05/18] enhancement #36: cleanup --- setup.cfg | 2 ++ .../producer.py | 4 ++++ src/gst_signalling/gst_consumer.py | 6 ++++++ 3 files changed, 12 insertions(+) diff --git a/setup.cfg b/setup.cfg index 605ce68..f8d3da6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,6 +45,8 @@ extend-ignore = E203 max-complexity = 10 per-file-ignores = src/gst_signalling/gst_abstract_role.py:E402 + src/gst_signalling/gst_consumer.py:E402 + src/example/datachannel-single-producer-multiple-consumer/producer.py:E402 [coverage:run] branch=True diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index dfc22ad..463c375 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -4,6 +4,10 @@ import os import time +import gi + +gi.require_version("Gst", "1.0") + from gi.repository import Gst from gst_signalling import GstSignallingProducer diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index cdbc9b8..41bcc5f 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -1,6 +1,12 @@ import logging from typing import Dict +import gi + +gi.require_version("Gst", "1.0") +gi.require_version("GstWebRTC", "1.0") +gi.require_version("GstSdp", "1.0") + from gi.repository import Gst, GstSdp, GstWebRTC from .gst_abstract_role import GstSession, GstSignallingAbstractRole From 4d0dd22f526c40b0cd81cde3d8c0bd4269303297 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Sun, 28 Jan 2024 14:53:10 +0100 Subject: [PATCH 06/18] enhancement #36: tested with gRPC Bridge --- setup.cfg | 7 +++---- .../producer.py | 2 -- src/gst_signalling/gst_abstract_role.py | 6 +++--- src/gst_signalling/gst_signalling.py | 6 +++--- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/setup.cfg b/setup.cfg index f8d3da6..56fb12b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,10 +16,9 @@ include_package_data = True package_dir= =src install_requires = - aiortc - numpy - pyee - websockets + numpy==1.25.1 + pyee==11.0.1 + websockets==11.0.3 PyGObject==3.42.2 [options.packages.find] diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 463c375..45c7aca 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -29,8 +29,6 @@ def main(args: argparse.Namespace) -> None: @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: - print("heeere") - def on_open(channel: Gst.Element) -> None: asyncio.run_coroutine_threadsafe(send_pings(channel), loop) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index ee68614..b9c5820 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -3,7 +3,7 @@ from typing import Any, Dict, NamedTuple, Optional import gi -import pyee +from pyee.asyncio import AsyncIOEventEmitter from .gst_signalling import GstSignalling @@ -21,13 +21,13 @@ ) -class GstSignallingAbstractRole(pyee.AsyncIOEventEmitter): +class GstSignallingAbstractRole(AsyncIOEventEmitter): def __init__( self, host: str, port: int, ) -> None: - pyee.AsyncIOEventEmitter.__init__(self) # type: ignore[no-untyped-call] + super().__init__() self.logger = logging.getLogger(__name__) diff --git a/src/gst_signalling/gst_signalling.py b/src/gst_signalling/gst_signalling.py index fcb05b6..d459021 100644 --- a/src/gst_signalling/gst_signalling.py +++ b/src/gst_signalling/gst_signalling.py @@ -3,11 +3,11 @@ import logging from typing import Any, Dict, List, Optional -import pyee +from pyee.asyncio import AsyncIOEventEmitter from websockets.legacy.client import WebSocketClientProtocol, connect -class GstSignalling(pyee.AsyncIOEventEmitter): +class GstSignalling(AsyncIOEventEmitter): """Signalling peer for the GStreamer WebRTC implementation. This class is used to communicate with a GStreamer WebRTC signalling server. @@ -48,7 +48,7 @@ def __init__(self, host: str, port: int) -> None: Args: host (str): Hostname of the signalling server. port (int): Port of the signalling server.""" - pyee.AsyncIOEventEmitter.__init__(self) # type: ignore[no-untyped-call] + AsyncIOEventEmitter.__init__(self) self.logger = logging.getLogger(__name__) From f10a6a3d2ecea73e6fe28023896d4921817e53e1 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Mon, 29 Jan 2024 08:45:29 +0100 Subject: [PATCH 07/18] enhancement #36: set latency buffer --- src/gst_signalling/gst_abstract_role.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index b9c5820..717bc9d 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -97,6 +97,7 @@ def init_webrtc(self, session_id: str) -> Gst.Element: assert webrtc webrtc.set_property("bundle-policy", "max-bundle") + webrtc.set_property("latency", 0) # webrtc.set_property("stun-server", None) # webrtc.set_property("turn-server", None) From 9330821847e09eaf3bc81abb672f371fd732b70b Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 14:57:48 +0100 Subject: [PATCH 08/18] enhancement #36: removing latency parameter which is not related to data channels --- src/gst_signalling/gst_abstract_role.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index 717bc9d..b9c5820 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -97,7 +97,6 @@ def init_webrtc(self, session_id: str) -> Gst.Element: assert webrtc webrtc.set_property("bundle-policy", "max-bundle") - webrtc.set_property("latency", 0) # webrtc.set_property("stun-server", None) # webrtc.set_property("turn-server", None) From 4aaebbe98ae2a63bcaf35783b1377e162a7f7f66 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 17:02:58 +0100 Subject: [PATCH 09/18] enhancement #36: fix mypy --- .../producer.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 45c7aca..4ccb7ee 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -8,13 +8,15 @@ gi.require_version("Gst", "1.0") -from gi.repository import Gst +from gi.repository import GstWebRTC from gst_signalling import GstSignallingProducer from gst_signalling.gst_abstract_role import GstSession -def on_data_channel_message(data_channel, data: str) -> None: # type: ignore[no-untyped-def] +def on_data_channel_message( + data_channel: GstWebRTC.WebRTCDataChannel, data: str +) -> None: logging.info(f"Message from DataChannel: {data}") @@ -29,16 +31,16 @@ def main(args: argparse.Namespace) -> None: @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: - def on_open(channel: Gst.Element) -> None: + def on_open(channel: GstWebRTC.WebRTCDataChannel) -> None: asyncio.run_coroutine_threadsafe(send_pings(channel), loop) - async def send_pings(channel: Gst.Element) -> None: + async def send_pings(channel: GstWebRTC.WebRTCDataChannel) -> None: try: t0 = time.time() while True: dt = time.time() - t0 - channel.send_string(f"ping: {dt:.1f}s") # type: ignore[attr-defined] + channel.send_string(f"ping: {dt:.1f}s") await asyncio.sleep(1.0 / FREQ_HZ) except Exception as e: logging.error(f"{e}") From a6c16f1426791b3097b7a7b37ad1903a389ac0fb Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 17:18:41 +0100 Subject: [PATCH 10/18] enhancement #36: removing numpy constraint --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 56fb12b..3784f05 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,7 @@ include_package_data = True package_dir= =src install_requires = - numpy==1.25.1 + numpy pyee==11.0.1 websockets==11.0.3 PyGObject==3.42.2 From 96bdbf286963702b82f32df2708b3d47e0a9e60d Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 17:22:51 +0100 Subject: [PATCH 11/18] enhancement #36: adding dependencies to ci/cd for building pygobject --- .github/workflows/lint.yml | 3 +++ .github/workflows/pytest.yml | 1 + 2 files changed, 4 insertions(+) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 9843623..8e7ab49 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -24,6 +24,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name: Lint with flake8 @@ -44,6 +45,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name : Lint with mypy @@ -60,6 +62,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name: Test with pytest diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index e6af4ce..d06d237 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -16,6 +16,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name: Unit tests From b1836d7ad4014c07a574ead6887d901cf0c9b222 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 17:26:58 +0100 Subject: [PATCH 12/18] enhancement #36: ignore flake8 false error --- setup.cfg | 5 +---- .../producer.py | 6 +++--- src/gst_signalling/gst_abstract_role.py | 2 +- src/gst_signalling/gst_consumer.py | 4 ++-- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/setup.cfg b/setup.cfg index 3784f05..3014d7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,10 +42,7 @@ exclude = tests,src/example/videostream-cli max-line-length = 128 extend-ignore = E203 max-complexity = 10 -per-file-ignores = - src/gst_signalling/gst_abstract_role.py:E402 - src/gst_signalling/gst_consumer.py:E402 - src/example/datachannel-single-producer-multiple-consumer/producer.py:E402 + [coverage:run] branch=True diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 4ccb7ee..02cac97 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -8,10 +8,10 @@ gi.require_version("Gst", "1.0") -from gi.repository import GstWebRTC +from gi.repository import GstWebRTC # noqa : E402 -from gst_signalling import GstSignallingProducer -from gst_signalling.gst_abstract_role import GstSession +from gst_signalling import GstSignallingProducer # noqa : E402 +from gst_signalling.gst_abstract_role import GstSession # noqa : E402 def on_data_channel_message( diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index b9c5820..1d0e2c4 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -10,7 +10,7 @@ gi.require_version("Gst", "1.0") gi.require_version("GstWebRTC", "1.0") -from gi.repository import Gst +from gi.repository import Gst # noqa : E402 GstSession = NamedTuple( "GstSession", diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index 41bcc5f..d8cddfe 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -7,9 +7,9 @@ gi.require_version("GstWebRTC", "1.0") gi.require_version("GstSdp", "1.0") -from gi.repository import Gst, GstSdp, GstWebRTC +from gi.repository import Gst, GstSdp, GstWebRTC # noqa : E402 -from .gst_abstract_role import GstSession, GstSignallingAbstractRole +from .gst_abstract_role import GstSession, GstSignallingAbstractRole # noqa : E402 class GstSignallingConsumer(GstSignallingAbstractRole): From 5e07cefaac887c78cc6fe34605c9beb45bfa1dc6 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 18:43:42 +0100 Subject: [PATCH 13/18] enhanbcement #36: removing pyogbject constaints --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 3014d7d..66bd50b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,7 +19,7 @@ install_requires = numpy pyee==11.0.1 websockets==11.0.3 - PyGObject==3.42.2 + PyGObject [options.packages.find] where=src From 11ac274b43fa3cf4129102a0a65e107bdadfa70d Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 19:01:19 +0100 Subject: [PATCH 14/18] enhancmeent #36: forcing ci/Cd to install gstreamer --- .github/workflows/pytest.yml | 2 +- setup.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index d06d237..956b938 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -16,7 +16,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | - sudo apt install -y libgirepository1.0-dev + sudo apt install -y libgirepository1.0-dev gstreamer1.0-plugins-bad python -m pip install --upgrade pip pip install .[dev] - name: Unit tests diff --git a/setup.cfg b/setup.cfg index 66bd50b..3014d7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,7 +19,7 @@ install_requires = numpy pyee==11.0.1 websockets==11.0.3 - PyGObject + PyGObject==3.42.2 [options.packages.find] where=src From 6435d9fc1e99d0ba5c1c212edfea5cf9888a9f94 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 19:08:40 +0100 Subject: [PATCH 15/18] enhancement #36: removing the ci/cd that tests nothing --- .github/workflows/pytest.yml | 55 ------------------------------------ scripts/git_hooks/pre-commit | 2 -- tests/__init__.py | 0 tests/test_import.py | 2 -- 4 files changed, 59 deletions(-) delete mode 100644 .github/workflows/pytest.yml delete mode 100644 tests/__init__.py delete mode 100644 tests/test_import.py diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml deleted file mode 100644 index 956b938..0000000 --- a/.github/workflows/pytest.yml +++ /dev/null @@ -1,55 +0,0 @@ -name: Pytest - -on: [pull_request] - -jobs: - tests: - - runs-on: ubuntu-20.04 - - steps: - - uses: actions/checkout@v3 - - name: Set up Python 3.10 - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: 'pip' # caching pip dependencies - - name: Install dependencies - run: | - sudo apt install -y libgirepository1.0-dev gstreamer1.0-plugins-bad - python -m pip install --upgrade pip - pip install .[dev] - - name: Unit tests - run: | - coverage run -m pytest - coverage xml - coverage json - coverage html - - name: Archive code coverage html report - uses: actions/upload-artifact@v3 - with: - name: code-coverage-report - path: htmlcov - - name: Get Cover - uses: orgoro/coverage@v3 - with: - coverageFile: coverage.xml - token: ${{ secrets.GITHUB_TOKEN }} - - name: Extract results - run: | - export TOTAL=$(python -c "import json;print(json.load(open('coverage.json'))['totals']['percent_covered_display'])") - echo "total=$TOTAL" >> $GITHUB_ENV - echo "### Total coverage: ${TOTAL}%" >> $GITHUB_STEP_SUMMARY - - name: Make badge - uses: schneegans/dynamic-badges-action@v1.6.0 - with: - # GIST_TOKEN is a GitHub personal access token with scope "gist". - auth: ${{ secrets.GIST_TOKEN }} - gistID: 230b4e6552d2c13164e28598eab5c1e3 - filename: gst-signalling-covbadge.json - label: Coverage - message: ${{ env.total }}% - minColorRange: 50 - maxColorRange: 90 - valColorRange: ${{ env.total }} - diff --git a/scripts/git_hooks/pre-commit b/scripts/git_hooks/pre-commit index 62a6011..6192b96 100755 --- a/scripts/git_hooks/pre-commit +++ b/scripts/git_hooks/pre-commit @@ -20,5 +20,3 @@ echo "-------> Flake8 passed!" mypy . echo "-------> Mypy passed!" -# Run pytest -coverage run -m pytest diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_import.py b/tests/test_import.py deleted file mode 100644 index 4a3ed5b..0000000 --- a/tests/test_import.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_import(): - from gst_signalling.gst_signalling import GstSignalling From b6da6db0025165a83c4bd0c31dd63f7779d39ebf Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Thu, 1 Feb 2024 19:12:34 +0100 Subject: [PATCH 16/18] enhancement #36 removing missing hidden pytest in cicd --- .github/workflows/lint.yml | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 8e7ab49..afdc1f4 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -50,21 +50,3 @@ jobs: pip install .[dev] - name : Lint with mypy run : mypy . -v - - pytest: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v3 - - name: Set up Python 3.10 - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: 'pip' # caching pip dependencies - - name: Install dependencies - run: | - sudo apt install -y libgirepository1.0-dev - python -m pip install --upgrade pip - pip install .[dev] - - name: Test with pytest - run: | - coverage run -m pytest \ No newline at end of file From 6fc2be9f6e3e9ae4e595606f0b430cbd6c8a72e8 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Wed, 21 Feb 2024 17:21:26 +0100 Subject: [PATCH 17/18] enhancement #36: moving arg parser to utils --- src/gst_signalling/utils.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/gst_signalling/utils.py b/src/gst_signalling/utils.py index f9197e9..860a20c 100644 --- a/src/gst_signalling/utils.py +++ b/src/gst_signalling/utils.py @@ -1,5 +1,6 @@ import asyncio from typing import Dict +import argparse from .gst_signalling import GstSignalling @@ -56,3 +57,32 @@ def find_producer_peer_id_by_name(host: str, port: int, name: str) -> str: return producer_id raise KeyError(f"Producer {name} not found.") + +def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: + """Adds command line arguments for GstSignalingForAiortc. + + * signaling-host: Hostname of the signalling server. + * signaling-port: Port of the signalling server. + * role: Signalling role (consumer or producer). + * name: Peer name. + * remote-producer-peer-id: Producer peer_id (required in consumer role!). + """ + parser.add_argument( + "--signaling-host", default="127.0.0.1", help="Gstreamer signaling host" + ) + parser.add_argument( + "--signaling-port", default=8443, help="Gstreamer signaling port" + ) + parser.add_argument("role", choices=["consumer", "producer"], help="Signaling role") + parser.add_argument("--name", default="aiortc-peer", help="peer name") + parser.add_argument( + "--remote-producer-peer-id", + type=str, + help="producer peer_id (in consumer role, either set this or remote-producer-peer-name!)", + ) + parser.add_argument( + "--remote-producer-peer-name", + type=str, + default="aiortc-peer", + help="producer peer_name (in consumer role, either set this or remote-producer-peer-id!)", + ) \ No newline at end of file From a2122a0b7beb486bdd18fbf69c6086f0f2d683d7 Mon Sep 17 00:00:00 2001 From: Fabien Danieau Date: Wed, 28 Feb 2024 15:51:13 +0100 Subject: [PATCH 18/18] enhancement #36: fix black --- src/gst_signalling/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/gst_signalling/utils.py b/src/gst_signalling/utils.py index 860a20c..fca48f1 100644 --- a/src/gst_signalling/utils.py +++ b/src/gst_signalling/utils.py @@ -58,6 +58,7 @@ def find_producer_peer_id_by_name(host: str, port: int, name: str) -> str: raise KeyError(f"Producer {name} not found.") + def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: """Adds command line arguments for GstSignalingForAiortc. @@ -85,4 +86,4 @@ def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: type=str, default="aiortc-peer", help="producer peer_name (in consumer role, either set this or remote-producer-peer-id!)", - ) \ No newline at end of file + )