diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 9843623..afdc1f4 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -24,6 +24,7 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name: Lint with flake8 @@ -44,24 +45,8 @@ jobs: cache: 'pip' # caching pip dependencies - name: Install dependencies run: | + sudo apt install -y libgirepository1.0-dev python -m pip install --upgrade pip pip install .[dev] - name : Lint with mypy run : mypy . -v - - pytest: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v3 - - name: Set up Python 3.10 - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: 'pip' # caching pip dependencies - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install .[dev] - - name: Test with pytest - run: | - coverage run -m pytest \ No newline at end of file diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml deleted file mode 100644 index e6af4ce..0000000 --- a/.github/workflows/pytest.yml +++ /dev/null @@ -1,54 +0,0 @@ -name: Pytest - -on: [pull_request] - -jobs: - tests: - - runs-on: ubuntu-20.04 - - steps: - - uses: actions/checkout@v3 - - name: Set up Python 3.10 - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: 'pip' # caching pip dependencies - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install .[dev] - - name: Unit tests - run: | - coverage run -m pytest - coverage xml - coverage json - coverage html - - name: Archive code coverage html report - uses: actions/upload-artifact@v3 - with: - name: code-coverage-report - path: htmlcov - - name: Get Cover - uses: orgoro/coverage@v3 - with: - coverageFile: coverage.xml - token: ${{ secrets.GITHUB_TOKEN }} - - name: Extract results - run: | - export TOTAL=$(python -c "import json;print(json.load(open('coverage.json'))['totals']['percent_covered_display'])") - echo "total=$TOTAL" >> $GITHUB_ENV - echo "### Total coverage: ${TOTAL}%" >> $GITHUB_STEP_SUMMARY - - name: Make badge - uses: schneegans/dynamic-badges-action@v1.6.0 - with: - # GIST_TOKEN is a GitHub personal access token with scope "gist". - auth: ${{ secrets.GIST_TOKEN }} - gistID: 230b4e6552d2c13164e28598eab5c1e3 - filename: gst-signalling-covbadge.json - label: Coverage - message: ${{ env.total }}% - minColorRange: 50 - maxColorRange: 90 - valColorRange: ${{ env.total }} - diff --git a/scripts/git_hooks/pre-commit b/scripts/git_hooks/pre-commit index 62a6011..6192b96 100755 --- a/scripts/git_hooks/pre-commit +++ b/scripts/git_hooks/pre-commit @@ -20,5 +20,3 @@ echo "-------> Flake8 passed!" mypy . echo "-------> Mypy passed!" -# Run pytest -coverage run -m pytest diff --git a/setup.cfg b/setup.cfg index 92de1e6..3014d7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,10 +16,10 @@ include_package_data = True package_dir= =src install_requires = - aiortc numpy - pyee - websockets + pyee==11.0.1 + websockets==11.0.3 + PyGObject==3.42.2 [options.packages.find] where=src @@ -30,6 +30,7 @@ dev = black==23.3.0 pytest==7.3.1 coverage==7.2.5 mypy==1.0.0 + pygobject-stubs==2.10.0 [options.entry_points] console_scripts = @@ -42,6 +43,7 @@ max-line-length = 128 extend-ignore = E203 max-complexity = 10 + [coverage:run] branch=True diff --git a/src/example/datachannel-cli/README.rst b/src/example/datachannel-cli/README.rst deleted file mode 100644 index 93797c3..0000000 --- a/src/example/datachannel-cli/README.rst +++ /dev/null @@ -1,29 +0,0 @@ -Data channel CLI -================ - -This example illustrates the establishment of a data channel using an -RTCPeerConnection and the gstreamer signaling channel to exchange SDP. - -First, run the gtreamer signaling server (see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc for details). - -.. code-block:: console - - $ WEBRTCSINK_SIGNALLING_SERVER_LOG=info gst-webrtc-signalling-server - - -To run the example, you will need instances of the `cli` example: - -- The first takes on the role of the producer and sets its name to `datachannel-cli-producer`. - -.. code-block:: console - - $ python cli.py producer --name datachannel-cli-producer - -- The second takes on the role of the consumer and uses the producer name to find it. - -.. code-block:: console - - $ python cli.py consumer --remote-producer-peer-name datachannel-cli-producer - - -You can also use peer_id instead of peer_name. diff --git a/src/example/datachannel-cli/cli.py b/src/example/datachannel-cli/cli.py deleted file mode 100644 index 5abbdeb..0000000 --- a/src/example/datachannel-cli/cli.py +++ /dev/null @@ -1,123 +0,0 @@ -import argparse -import asyncio -import logging -import time - -from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription -from gst_signalling.aiortc_adapter import BYE, add_signaling_arguments, create_signaling - - -def channel_log(channel, t, message): - print("channel(%s) %s %s" % (channel.label, t, message)) - - -def channel_send(channel, message): - channel_log(channel, ">", message) - channel.send(message) - - -async def consume_signaling(pc, signaling): - while True: - obj = await signaling.receive() - - if isinstance(obj, RTCSessionDescription): - await pc.setRemoteDescription(obj) - - if obj.type == "offer": - # send answer - await pc.setLocalDescription(await pc.createAnswer()) - await signaling.send(pc.localDescription) - elif isinstance(obj, RTCIceCandidate): - await pc.addIceCandidate(obj) - elif obj is BYE: - print("Exiting") - break - - -time_start = None - - -def current_stamp(): - global time_start - - if time_start is None: - time_start = time.time() - return 0 - else: - return int((time.time() - time_start) * 1000000) - - -async def run_answer(pc, signaling): - await signaling.connect() - - @pc.on("datachannel") - def on_datachannel(channel): - channel_log(channel, "-", "created by remote party") - - @channel.on("message") - def on_message(message): - channel_log(channel, "<", message) - - if isinstance(message, str) and message.startswith("ping"): - # reply - channel_send(channel, "pong" + message[4:]) - - await consume_signaling(pc, signaling) - - -async def run_offer(pc, signaling): - await signaling.connect() - - channel = pc.createDataChannel("chat") - channel_log(channel, "-", "created by local party") - - async def send_pings(): - while True: - channel_send(channel, "ping %d" % current_stamp()) - await asyncio.sleep(1) - - @channel.on("open") - def on_open(): - asyncio.ensure_future(send_pings()) - - @channel.on("message") - def on_message(message): - channel_log(channel, "<", message) - - if isinstance(message, str) and message.startswith("pong"): - elapsed_ms = (current_stamp() - int(message[5:])) / 1000 - print(" RTT %.2f ms" % elapsed_ms) - - # send offer - await pc.setLocalDescription(await pc.createOffer()) - await signaling.send(pc.localDescription) - - await consume_signaling(pc, signaling) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Data channels ping/pong") - parser.add_argument("--verbose", "-v", action="count") - add_signaling_arguments(parser) - - args = parser.parse_args() - - if args.verbose: - logging.basicConfig(level=logging.INFO) - - signaling = create_signaling(args) - pc = RTCPeerConnection() - if args.role == "producer": - coro = run_offer(pc, signaling) - else: - coro = run_answer(pc, signaling) - - # run event loop - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(coro) - except KeyboardInterrupt: - pass - finally: - loop.run_until_complete(pc.close()) - loop.run_until_complete(signaling.close()) diff --git a/src/example/datachannel-single-producer-multiple-consumer/README.md b/src/example/datachannel-single-producer-multiple-consumer/README.md index eeba226..4ebd69b 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/README.md +++ b/src/example/datachannel-single-producer-multiple-consumer/README.md @@ -7,11 +7,11 @@ This example shows how to use a single producer and multiple consumers. * Starts the producer: ```bash -python producer.py +python producer.py [-vv] ``` * Starts any number of consumers: ```bash -python consumer.py +python consumer.py [-vv] ``` \ No newline at end of file diff --git a/src/example/datachannel-single-producer-multiple-consumer/consumer.py b/src/example/datachannel-single-producer-multiple-consumer/consumer.py index c4c9fbe..74058c4 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/consumer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/consumer.py @@ -1,11 +1,24 @@ -from aiortc import RTCDataChannel import argparse import asyncio import logging -from gst_signalling import GstSession, GstSignallingConsumer +import os + +from gi.repository import Gst + +from gst_signalling import GstSignallingConsumer +from gst_signalling.gst_abstract_role import GstSession from gst_signalling.utils import find_producer_peer_id_by_name +def on_data_channel_message(data_channel, data: str) -> None: # type: ignore[no-untyped-def] + logging.info(f"Message from DataChannel: {data}") + data_channel.send_string("pong") + + +def on_data_channel_callback(webrtc: Gst.Element, data_channel) -> None: # type: ignore[no-untyped-def] + data_channel.connect("on-message-string", on_data_channel_message) + + def main(args: argparse.Namespace) -> None: peer_id = find_producer_peer_id_by_name( args.signaling_host, args.signaling_port, args.producer_name @@ -22,12 +35,7 @@ def main(args: argparse.Namespace) -> None: @consumer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: pc = session.pc - - @pc.on("datachannel") # type: ignore[misc] - def on_datachannel(channel: RTCDataChannel) -> None: - @channel.on("message") # type: ignore[misc] - def on_message(message: str) -> None: - print("received message:", message) + pc.connect("on-data-channel", on_data_channel_callback) @consumer.on("close_session") # type: ignore[misc] def on_close_session(session: GstSession) -> None: @@ -65,5 +73,6 @@ async def run_consumer(consumer: GstSignallingConsumer) -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/example/datachannel-single-producer-multiple-consumer/producer.py b/src/example/datachannel-single-producer-multiple-consumer/producer.py index 2798241..02cac97 100644 --- a/src/example/datachannel-single-producer-multiple-consumer/producer.py +++ b/src/example/datachannel-single-producer-multiple-consumer/producer.py @@ -1,10 +1,23 @@ -import aiortc import argparse import asyncio import logging +import os import time -from gst_signalling import GstSession, GstSignallingProducer +import gi + +gi.require_version("Gst", "1.0") + +from gi.repository import GstWebRTC # noqa : E402 + +from gst_signalling import GstSignallingProducer # noqa : E402 +from gst_signalling.gst_abstract_role import GstSession # noqa : E402 + + +def on_data_channel_message( + data_channel: GstWebRTC.WebRTCDataChannel, data: str +) -> None: + logging.info(f"Message from DataChannel: {data}") def main(args: argparse.Namespace) -> None: @@ -14,26 +27,31 @@ def main(args: argparse.Namespace) -> None: name=args.name, ) + FREQ_HZ = 100 + @producer.on("new_session") # type: ignore[misc] def on_new_session(session: GstSession) -> None: - pc = session.pc + def on_open(channel: GstWebRTC.WebRTCDataChannel) -> None: + asyncio.run_coroutine_threadsafe(send_pings(channel), loop) - channel = pc.createDataChannel("chat") - - async def send_pings() -> None: + async def send_pings(channel: GstWebRTC.WebRTCDataChannel) -> None: try: t0 = time.time() while True: dt = time.time() - t0 - channel.send(f"ping: {dt:.1f}s") - await asyncio.sleep(1) - except aiortc.exceptions.InvalidStateError: - print("Channel closed") + channel.send_string(f"ping: {dt:.1f}s") + await asyncio.sleep(1.0 / FREQ_HZ) + except Exception as e: + logging.error(f"{e}") - @channel.on("open") # type: ignore[misc] - def on_open() -> None: - asyncio.ensure_future(send_pings()) + pc = session.pc + data_channel = pc.emit("create-data-channel", "chat", None) + if data_channel: + data_channel.connect("on-open", on_open) + data_channel.connect("on-message-string", on_data_channel_message) + else: + logging.error("Failed to create data channel") # run event loop loop = asyncio.get_event_loop() @@ -61,5 +79,6 @@ def on_open() -> None: logging.basicConfig(level=logging.INFO) elif args.verbose > 1: logging.basicConfig(level=logging.DEBUG) + os.environ["GST_DEBUG"] = "4" main(args) diff --git a/src/example/videostream-cli/README.rst b/src/example/videostream-cli/README.rst deleted file mode 100644 index 9fbdccf..0000000 --- a/src/example/videostream-cli/README.rst +++ /dev/null @@ -1,53 +0,0 @@ -Video channel CLI -================= - -This example illustrates the establishment of a video stream using an -RTCPeerConnection. - -It uses the gstreamer signaling mecanisms. - -By default the sent video is an animated French flag, but it is also possible -to use a MediaPlayer to read media from a file. - -This example also illustrates how to use a MediaRecorder to capture media to a -file. - -Running the example -------------------- - -First, run the gtreamer signaling server (see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc for details). - -.. code-block:: console - - $ WEBRTCSINK_SIGNALLING_SERVER_LOG=info gst-webrtc-signalling-server - -To run the example, you will need instances of the `cli` example: - -- The first takes on the role of the producer and sets its name to - `videostream-cli-producer`. - -.. code-block:: console - - $ python cli.py producer --name videostream-cli-producer - -- The second takes on the role of the consumer. - -.. code-block:: console - - $ python cli.py consumer --remote-producer-peer-name videostream-cli-producer - -Additional options ------------------- - -If you want to play a media file instead of sending the example image, run: - -.. code-block:: console - - $ python cli.py --play-from video.mp4 - -If you want to recording the received video you can run one of the following: - -.. code-block:: console - - $ python cli.py answer --record-to video.mp4 - $ python cli.py answer --record-to video-%3d.png diff --git a/src/example/videostream-cli/cli.py b/src/example/videostream-cli/cli.py deleted file mode 100644 index f6b4477..0000000 --- a/src/example/videostream-cli/cli.py +++ /dev/null @@ -1,168 +0,0 @@ -import argparse -import asyncio -import logging -import math - -import cv2 -import numpy -from aiortc import ( - RTCIceCandidate, - RTCPeerConnection, - RTCSessionDescription, - VideoStreamTrack, -) -from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder -from gst_signalling.aiortc_adapter import BYE, add_signaling_arguments, create_signaling -from av import VideoFrame - - -class FlagVideoStreamTrack(VideoStreamTrack): - """ - A video track that returns an animated flag. - """ - - def __init__(self): - super().__init__() # don't forget this! - self.counter = 0 - height, width = 480, 640 - - # generate flag - data_bgr = numpy.hstack( - [ - self._create_rectangle( - width=213, height=480, color=(255, 0, 0) - ), # blue - self._create_rectangle( - width=214, height=480, color=(255, 255, 255) - ), # white - self._create_rectangle(width=213, height=480, color=(0, 0, 255)), # red - ] - ) - - # shrink and center it - M = numpy.float32([[0.5, 0, width / 4], [0, 0.5, height / 4]]) - data_bgr = cv2.warpAffine(data_bgr, M, (width, height)) - - # compute animation - omega = 2 * math.pi / height - id_x = numpy.tile(numpy.array(range(width), dtype=numpy.float32), (height, 1)) - id_y = numpy.tile( - numpy.array(range(height), dtype=numpy.float32), (width, 1) - ).transpose() - - self.frames = [] - for k in range(30): - phase = 2 * k * math.pi / 30 - map_x = id_x + 10 * numpy.cos(omega * id_x + phase) - map_y = id_y + 10 * numpy.sin(omega * id_x + phase) - self.frames.append( - VideoFrame.from_ndarray( - cv2.remap(data_bgr, map_x, map_y, cv2.INTER_LINEAR), format="bgr24" - ) - ) - - async def recv(self): - pts, time_base = await self.next_timestamp() - - frame = self.frames[self.counter % 30] - frame.pts = pts - frame.time_base = time_base - self.counter += 1 - return frame - - def _create_rectangle(self, width, height, color): - data_bgr = numpy.zeros((height, width, 3), numpy.uint8) - data_bgr[:, :] = color - return data_bgr - - -async def run(pc, player, recorder, signaling, role): - def add_tracks(): - if player and player.audio: - pc.addTrack(player.audio) - - if player and player.video: - pc.addTrack(player.video) - else: - pc.addTrack(FlagVideoStreamTrack()) - - @pc.on("track") - def on_track(track): - print("Receiving %s" % track.kind) - recorder.addTrack(track) - - # connect signaling - await signaling.connect() - - if role == "producer": - # send offer - add_tracks() - await pc.setLocalDescription(await pc.createOffer()) - await signaling.send(pc.localDescription) - - # consume signaling - while True: - obj = await signaling.receive() - - if isinstance(obj, RTCSessionDescription): - await pc.setRemoteDescription(obj) - await recorder.start() - - if obj.type == "offer": - # send answer - add_tracks() - await pc.setLocalDescription(await pc.createAnswer()) - await signaling.send(pc.localDescription) - elif isinstance(obj, RTCIceCandidate): - await pc.addIceCandidate(obj) - elif obj is BYE: - print("Exiting") - break - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Video stream from the command line") - parser.add_argument("--play-from", help="Read the media from a file and sent it."), - parser.add_argument("--record-to", help="Write received media to a file."), - parser.add_argument("--verbose", "-v", action="count") - add_signaling_arguments(parser) - args = parser.parse_args() - - if args.verbose: - logging.basicConfig(level=logging.INFO) - - # create signaling and peer connection - signaling = create_signaling(args) - pc = RTCPeerConnection() - - # create media source - if args.play_from: - player = MediaPlayer(args.play_from) - else: - player = None - - # create media sink - if args.record_to: - recorder = MediaRecorder(args.record_to) - else: - recorder = MediaBlackhole() - - # run event loop - loop = asyncio.get_event_loop() - try: - loop.run_until_complete( - run( - pc=pc, - player=player, - recorder=recorder, - signaling=signaling, - role=args.role, - ) - ) - except KeyboardInterrupt: - pass - finally: - # cleanup - loop.run_until_complete(recorder.stop()) - loop.run_until_complete(signaling.close()) - loop.run_until_complete(pc.close()) diff --git a/src/gst_signalling/__init__.py b/src/gst_signalling/__init__.py index 74b2f28..3851f3f 100644 --- a/src/gst_signalling/__init__.py +++ b/src/gst_signalling/__init__.py @@ -1,5 +1,4 @@ -from .aiortc_adapter import GstSignalingForAiortc # noqa: F401 -from .gst_abstract_role import GstSession # noqa: F401 +# from .gst_abstract_role import GstSession # noqa: F401 from .gst_consumer import GstSignallingConsumer # noqa: F401 from .gst_listener import GstSignallingListener # noqa: F401 from .gst_producer import GstSignallingProducer # noqa: F401 diff --git a/src/gst_signalling/aiortc_adapter.py b/src/gst_signalling/aiortc_adapter.py deleted file mode 100644 index 6b5686b..0000000 --- a/src/gst_signalling/aiortc_adapter.py +++ /dev/null @@ -1,246 +0,0 @@ -from aiortc import RTCIceCandidate, RTCSessionDescription -from aiortc.contrib.signaling import object_from_string, object_to_string -from aiortc.sdp import candidate_from_sdp -import argparse -import asyncio -import json -import logging -from typing import Any, Optional, Union - -from .gst_signalling import GstSignalling -from .utils import find_producer_peer_id_by_name - - -class GstSignalingForAiortc: - """Gstreamer signalling for aiortc. - - This class is a wrapper around GstSignalling that provides a simple interface, - that should be mostly compatible with aiortc examples. - """ - - def __init__( - self, - signaling_host: str, - signaling_port: int, - role: str, - name: str, - remote_producer_peer_id: Optional[str] = None, - ): - """Initializes the signalling peer. - - Args: - signaling_host (str): Hostname of the signalling server. - signaling_port (int): Port of the signalling server. - role (str): Signalling role (consumer or producer). - name (str): Peer name. - remote_producer_peer_id (Optional[str], optional): Producer peer_id (required in consumer role!). - """ - self.logger = logging.getLogger(__name__) - self.signalling = GstSignalling(signaling_host, signaling_port) - - if role not in ("consumer", "producer"): - raise ValueError("invalid role {role}") - self.role = role - - self.name = name - - self.peer_id: Optional[str] = None - self.peer_id_evt = asyncio.Event() - - self.session_id: Optional[str] = None - self.session_id_evt = asyncio.Event() - - self.peer_msg_queue: asyncio.Queue[ - Union[RTCSessionDescription, RTCIceCandidate, object] - ] = asyncio.Queue() - - self._setup(remote_producer_peer_id) - - def _setup(self, remote_producer_peer_id: Optional[str] = None) -> None: - @self.signalling.on("Welcome") # type: ignore[arg-type] - def on_welcome(peer_id: str) -> None: - self.logger.info(f"Welcome received, peer_id: {peer_id}") - self.peer_id = peer_id - self.peer_id_evt.set() - - if self.role == "producer": - self._setup_producer() - elif self.role == "consumer": - if remote_producer_peer_id is None: - raise ValueError( - "In consumer role, you have to specify the remote-producer-peer-id!" - ) - self._setup_consumer(remote_producer_peer_id) - - @self.signalling.on("Peer") # type: ignore[arg-type] - async def on_peer(session_id: str, message: dict[str, Any]) -> None: - assert self.session_id == session_id - obj = self._parse_peer_message(message) - if obj is not None: - await self.peer_msg_queue.put(obj) - - @self.signalling.on("EndSession") # type: ignore[arg-type] - async def on_end_session(session_id: str) -> None: - await self.peer_msg_queue.put(BYE) - - @self.signalling.on("Error") # type: ignore[arg-type] - async def on_error(details: str) -> None: - self.logger.error(f'Connection closed with error: "{details}"') - await self.peer_msg_queue.put(BYE) - - def _parse_peer_message( - self, message: dict[str, Any] - ) -> Optional[Union[RTCSessionDescription, RTCIceCandidate]]: - if "sdp" in message: - message = message["sdp"] - elif "ice" in message: - message = message["ice"] - else: - raise ValueError(f"Invalid message {message}") - - if "type" in message: - obj = object_from_string(json.dumps(message)) - return obj - elif "candidate" in message: - if message["candidate"] == "": - self.logger.info("Received empty candidate, ignoring") - return None - - obj = candidate_from_sdp(message["candidate"].split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - return obj - else: - self.logger.error(f"Failed to parse message: {message}") - return None - - def _setup_producer(self) -> None: - @self.signalling.on("StartSession") # type: ignore[arg-type] - def on_start_session(peer_id: str, session_id: str) -> None: - self.logger.info( - f"StartSession received, peer_id: {peer_id}, session_id: {session_id}" - ) - self.session_id = session_id - self.session_id_evt.set() - - def _setup_consumer(self, remote_producer_peer_id: str) -> None: - self.remote_producer_peer_id = remote_producer_peer_id - - @self.signalling.on("SessionStarted") # type: ignore[arg-type] - def on_session_started(peer_id: str, session_id: str) -> None: - self.logger.info( - f"SessionStarted received, peer_id: {peer_id}, session_id: {session_id}" - ) - self.session_id = session_id - self.session_id_evt.set() - - async def connect(self) -> None: - """Connects to the signalling server. - - This method will block until the connection is established (meaning we received a PeerID and a SessionID). - This method has to be called before any other method. - """ - await self.signalling.connect() - await self.peer_id_evt.wait() - - if self.role == "producer": - await self.signalling.set_peer_status(roles=["producer"], name=self.name) - - elif self.role == "consumer": - await self.signalling.start_session(peer_id=self.remote_producer_peer_id) - - await self.session_id_evt.wait() - - self.logger.info( - f"Connected, peer_id: {self.peer_id}, session_id: {self.session_id}" - ) - - async def close(self) -> None: - """Closes the connection to the signalling server.""" - await self.signalling.close() - - async def send( - self, message: Union[RTCIceCandidate, RTCSessionDescription] - ) -> None: - """Sends a message to the other peer (SDP or ICECandidate). - - Args: - message (Union[RTCIceCandidate, RTCSessionDescription]): Message to send. - """ - data = json.loads(object_to_string(message)) - assert self.session_id is not None - - if isinstance(message, RTCSessionDescription): - await self.signalling.send_peer_message(self.session_id, "sdp", data) - elif isinstance(message, RTCIceCandidate): - await self.signalling.send_peer_message(self.session_id, "ice", data) - else: - raise ValueError(f"Invalid message type {type(message)}") - - async def receive(self) -> Union[RTCIceCandidate, RTCSessionDescription]: - """Receives a message from the other peer (SDP or ICECandidate).""" - obj = await self.peer_msg_queue.get() - self.logger.info(f"Received peer message: {obj}") - return obj - - -BYE = object() - - -def create_signaling(args: argparse.Namespace) -> GstSignalingForAiortc: - """Creates a GstSignalingForAiortc instance from command line arguments.""" - if args.role == "consumer": - if ( - args.remote_producer_peer_id is None - and args.remote_producer_peer_name is None - ): - raise ValueError( - "In consumer role, you have to specify the remote-producer-peer-id or remote-producer-peer-name!" - ) - elif args.remote_producer_peer_id is not None: - remote_producer_peer_id = args.remote_producer_peer_id - else: - remote_producer_peer_id = find_producer_peer_id_by_name( - host=args.signaling_host, - port=args.signaling_port, - name=args.remote_producer_peer_name, - ) - else: - remote_producer_peer_id = None - - return GstSignalingForAiortc( - signaling_host=args.signaling_host, - signaling_port=args.signaling_port, - role=args.role, - name=args.name, - remote_producer_peer_id=remote_producer_peer_id, - ) - - -def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: - """Adds command line arguments for GstSignalingForAiortc. - - * signaling-host: Hostname of the signalling server. - * signaling-port: Port of the signalling server. - * role: Signalling role (consumer or producer). - * name: Peer name. - * remote-producer-peer-id: Producer peer_id (required in consumer role!). - """ - parser.add_argument( - "--signaling-host", default="127.0.0.1", help="Gstreamer signaling host" - ) - parser.add_argument( - "--signaling-port", default=8443, help="Gstreamer signaling port" - ) - parser.add_argument("role", choices=["consumer", "producer"], help="Signaling role") - parser.add_argument("--name", default="aiortc-peer", help="peer name") - parser.add_argument( - "--remote-producer-peer-id", - type=str, - help="producer peer_id (in consumer role, either set this or remote-producer-peer-name!)", - ) - parser.add_argument( - "--remote-producer-peer-name", - type=str, - default="aiortc-peer", - help="producer peer_name (in consumer role, either set this or remote-producer-peer-id!)", - ) diff --git a/src/gst_signalling/gst_abstract_role.py b/src/gst_signalling/gst_abstract_role.py index bd7a53a..1d0e2c4 100644 --- a/src/gst_signalling/gst_abstract_role.py +++ b/src/gst_signalling/gst_abstract_role.py @@ -1,37 +1,33 @@ -from aiortc import ( - RTCIceCandidate, - RTCPeerConnection, - RTCSessionDescription, -) -from aiortc.contrib.signaling import object_from_string, object_to_string import asyncio -import json import logging -import pyee from typing import Any, Dict, NamedTuple, Optional -from aiortc.sdp import candidate_from_sdp - +import gi +from pyee.asyncio import AsyncIOEventEmitter from .gst_signalling import GstSignalling +gi.require_version("Gst", "1.0") +gi.require_version("GstWebRTC", "1.0") + +from gi.repository import Gst # noqa : E402 GstSession = NamedTuple( "GstSession", [ ("peer_id", str), - ("pc", RTCPeerConnection), + ("pc", Gst.Element), # type '__gi__.GstWebRTCBin' ], ) -class GstSignallingAbstractRole(pyee.AsyncIOEventEmitter): +class GstSignallingAbstractRole(AsyncIOEventEmitter): def __init__( self, host: str, port: int, ) -> None: - pyee.AsyncIOEventEmitter.__init__(self) # type: ignore[no-untyped-call] + super().__init__() self.logger = logging.getLogger(__name__) @@ -39,6 +35,7 @@ def __init__( self.peer_id: Optional[str] = None self.peer_id_evt = asyncio.Event() + self._asyncloop = asyncio.get_event_loop() self.sessions: Dict[str, GstSession] = {} @@ -71,6 +68,45 @@ async def on_end_session(session_id: str) -> None: self.signalling = signalling + Gst.init(None) + + self._pipeline = Gst.Pipeline.new() + + def __del__(self) -> None: + Gst.deinit() + + def make_send_sdp( + self, sdp: Any, type: str, session_id: str + ) -> None: # sdp is GstWebRTC.WebRTCSessionDescription + text = sdp.sdp.as_text() + msg = {"type": type, "sdp": text} + asyncio.run_coroutine_threadsafe( + self.send_sdp(session_id, msg), self._asyncloop + ) + + def send_ice_candidate_message( + self, _: Gst.Element, mlineindex: int, candidate: str, session_id: str + ) -> None: + icemsg = {"candidate": candidate, "sdpMLineIndex": mlineindex} + asyncio.run_coroutine_threadsafe( + self.send_ice(session_id, icemsg), self._asyncloop + ) + + def init_webrtc(self, session_id: str) -> Gst.Element: + webrtc = Gst.ElementFactory.make("webrtcbin") + assert webrtc + + webrtc.set_property("bundle-policy", "max-bundle") + # webrtc.set_property("stun-server", None) + # webrtc.set_property("turn-server", None) + + webrtc.connect("on-ice-candidate", self.send_ice_candidate_message, session_id) + + self._pipeline.set_state(Gst.State.READY) + self._pipeline.add(webrtc) + + return webrtc + async def connect(self) -> None: assert self.signalling is not None @@ -86,67 +122,37 @@ async def consume(self) -> None: # Session management async def setup_session(self, session_id: str, peer_id: str) -> GstSession: - pc = RTCPeerConnection() + self.logger.info("setup session") + pc = self.init_webrtc(session_id) session = GstSession(peer_id, pc) - self.sessions[session_id] = session - self.emit("new_session", session) + self.sessions[session_id] = session return session async def peer_for_session( - self, session_id: str, message: Dict[str, Dict[str, Any]] + self, session_id: str, message: Dict[str, Dict[str, str]] ) -> None: - session = self.sessions[session_id] - pc = session.pc - - if "sdp" in message: - obj = object_from_string(json.dumps(message["sdp"])) - - if isinstance(obj, RTCSessionDescription): - if obj.type == "offer": - self.logger.info("Received offer sdp") - await pc.setRemoteDescription(obj) - - self.logger.info("Sending answer") - await pc.setLocalDescription(await pc.createAnswer()) - - self.logger.info("Sending answer sdp") - await self.send_sdp(session_id, pc.localDescription) + self.logger.info(f"peer for session {session_id} {message}") - elif obj.type == "answer": - self.logger.info("Received answer sdp") - await pc.setRemoteDescription(obj) - - elif "ice" in message: - if "type" in message and str(message["type"]) == "candidate": - obj = object_from_string(json.dumps(message["ice"])) - - if isinstance(obj, RTCIceCandidate): - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) - - else: - message = message["ice"] - if str(message["candidate"]) == "": - self.logger.info("Received empty candidate, ignoring") - return None - - obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1]) - obj.sdpMLineIndex = message["sdpMLineIndex"] - - self.logger.info(f"Received ice candidate {obj}") - await pc.addIceCandidate(obj) + def handle_ice_message(self, webrtc: Gst.Element, ice_msg: Dict[str, Any]) -> None: + candidate = ice_msg["candidate"] + sdpmlineindex = ice_msg["sdpMLineIndex"] + webrtc.emit("add-ice-candidate", sdpmlineindex, candidate) async def close_session(self, session_id: str) -> None: + self.logger.info("close session") + """ session = self.sessions.pop(session_id) self.emit("close_session", session) await session.pc.close() + """ - async def send_sdp(self, session_id: str, sdp: RTCSessionDescription) -> None: - await self.signalling.send_peer_message( - session_id, "sdp", json.loads(object_to_string(sdp)) - ) + async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None: + await self.signalling.send_peer_message(session_id, "sdp", sdp) + + async def send_ice(self, session_id: str, ice: Dict[str, Any]) -> None: + await self.signalling.send_peer_message(session_id, "ice", ice) diff --git a/src/gst_signalling/gst_consumer.py b/src/gst_signalling/gst_consumer.py index 8d61ad0..d8cddfe 100644 --- a/src/gst_signalling/gst_consumer.py +++ b/src/gst_signalling/gst_consumer.py @@ -1,4 +1,15 @@ -from .gst_abstract_role import GstSignallingAbstractRole +import logging +from typing import Dict + +import gi + +gi.require_version("Gst", "1.0") +gi.require_version("GstWebRTC", "1.0") +gi.require_version("GstSdp", "1.0") + +from gi.repository import Gst, GstSdp, GstWebRTC # noqa : E402 + +from .gst_abstract_role import GstSession, GstSignallingAbstractRole # noqa : E402 class GstSignallingConsumer(GstSignallingAbstractRole): @@ -9,8 +20,70 @@ def __init__( producer_peer_id: str, ) -> None: super().__init__(host, port) + self.logger = logging.getLogger(__name__) self.producer_peer_id = producer_peer_id async def connect(self) -> None: await super().connect() await self.signalling.start_session(self.producer_peer_id) + self.logger.info("connect") + + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: + session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session consumer") + + self._pipeline.set_state(Gst.State.PLAYING) + self.emit("new_session", session) + + return session + + def on_answer_created( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + # answer = reply["answer"] + answer = reply.get_value("answer") # type: ignore[union-attr] + promise = Gst.Promise.new() + webrtc.emit("set-local-description", answer, promise) + promise.interrupt() # we don't care about the result, discard it + self.make_send_sdp(answer, "answer", session_id) + + def on_offer_set( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + promise = Gst.Promise.new_with_change_func( + self.on_answer_created, webrtc, session_id + ) + webrtc.emit("create-answer", None, promise) + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "offer": + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + sdp_type = GstWebRTC.WebRTCSDPType.OFFER + offer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + promise = Gst.Promise.new_with_change_func( + self.on_offer_set, webrtc, session_id + ) + webrtc.emit("set-remote-description", offer, promise) + self.logger.debug("set remote desc done") + + elif message["sdp"]["type"] == "answer": + self.logger.warning("Consumer should not receive the answer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + + elif "ice" in message: + self.handle_ice_message(webrtc, message["ice"]) + + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_producer.py b/src/gst_signalling/gst_producer.py index a37c453..07f9e31 100644 --- a/src/gst_signalling/gst_producer.py +++ b/src/gst_signalling/gst_producer.py @@ -1,15 +1,16 @@ +import logging +from typing import Dict + +from gi.repository import Gst, GstSdp, GstWebRTC + from .gst_abstract_role import GstSession, GstSignallingAbstractRole class GstSignallingProducer(GstSignallingAbstractRole): - def __init__( - self, - host: str, - port: int, - name: str, - ) -> None: + def __init__(self, host: str, port: int, name: str) -> None: super().__init__(host, port) self.name = name + self.logger = logging.getLogger(__name__) async def connect(self) -> None: await super().connect() @@ -19,12 +20,66 @@ async def serve4ever(self) -> None: await self.connect() await self.consume() + def on_offer_created( + self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str + ) -> None: + self.logger.debug(f"on offer created {promise} {webrtc} {session_id}") + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + offer = reply.get_value("offer") # type: ignore[union-attr] + + promise = Gst.Promise.new() + self.logger.info("Offer created, setting local description") + webrtc.emit("set-local-description", offer, promise) + promise.interrupt() + self.make_send_sdp(offer, "offer", session_id) + + def on_negotiation_needed(self, element: Gst.Element, session_id: str) -> None: + self.logger.debug(f"on negociation needed {element} {session_id}") + promise = Gst.Promise.new_with_change_func( + self.on_offer_created, element, session_id + ) + element.emit("create-offer", None, promise) + async def setup_session(self, session_id: str, peer_id: str) -> GstSession: session = await super().setup_session(session_id, peer_id) + self.logger.info("setup session producer") # send offer pc = session.pc - await pc.setLocalDescription(await pc.createOffer()) - await self.send_sdp(session_id, pc.localDescription) + pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id) + + self._pipeline.set_state(Gst.State.PLAYING) + + self.emit("new_session", session) return session + + async def peer_for_session( + self, session_id: str, message: Dict[str, Dict[str, str]] + ) -> None: + self.logger.info(f"peer for session {session_id} {message}") + + session = self.sessions[session_id] + webrtc = session.pc + + if "sdp" in message: + if message["sdp"]["type"] == "answer": + self.logger.debug("set remote desc") + _, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"]) + + sdp_type = GstWebRTC.WebRTCSDPType.ANSWER + answer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg) + + promise = Gst.Promise.new() + webrtc.emit("set-remote-description", answer, promise) + promise.interrupt() + self.logger.debug("set remote desc done") + elif message["sdp"]["type"] == "offer": + self.logger.warning("producer should not receive the offer") + else: + self.logger.error(f"SDP not properly formatted {message['sdp']}") + elif "ice" in message: + self.handle_ice_message(webrtc, message["ice"]) + else: + self.logger.error(f"message not processed {message}") diff --git a/src/gst_signalling/gst_signalling.py b/src/gst_signalling/gst_signalling.py index 3b9acc7..d459021 100644 --- a/src/gst_signalling/gst_signalling.py +++ b/src/gst_signalling/gst_signalling.py @@ -1,12 +1,13 @@ import asyncio import json import logging -import pyee from typing import Any, Dict, List, Optional -from websockets.legacy.client import connect, WebSocketClientProtocol +from pyee.asyncio import AsyncIOEventEmitter +from websockets.legacy.client import WebSocketClientProtocol, connect -class GstSignalling(pyee.AsyncIOEventEmitter): + +class GstSignalling(AsyncIOEventEmitter): """Signalling peer for the GStreamer WebRTC implementation. This class is used to communicate with a GStreamer WebRTC signalling server. @@ -47,7 +48,7 @@ def __init__(self, host: str, port: int) -> None: Args: host (str): Hostname of the signalling server. port (int): Port of the signalling server.""" - pyee.AsyncIOEventEmitter.__init__(self) # type: ignore[no-untyped-call] + AsyncIOEventEmitter.__init__(self) self.logger = logging.getLogger(__name__) @@ -201,7 +202,7 @@ async def end_session(self, session_id: str) -> None: await self._send(message) async def send_peer_message( - self, session_id: str, type: str, peer_message: str + self, session_id: str, type: str, peer_message: Dict[str, Any] ) -> None: """Sends a message to a peer the sender is currently in session with. diff --git a/src/gst_signalling/utils.py b/src/gst_signalling/utils.py index b783a06..c62df7d 100644 --- a/src/gst_signalling/utils.py +++ b/src/gst_signalling/utils.py @@ -1,5 +1,6 @@ import asyncio from typing import Dict +import argparse from .gst_signalling import GstSignalling @@ -55,3 +56,33 @@ def find_producer_peer_id_by_name(host: str, port: int, name: str) -> str: return producer_id raise KeyError(f"Producer {name} not found.") + + +def add_signaling_arguments(parser: argparse.ArgumentParser) -> None: + """Adds command line arguments for GstSignalingForAiortc. + + * signaling-host: Hostname of the signalling server. + * signaling-port: Port of the signalling server. + * role: Signalling role (consumer or producer). + * name: Peer name. + * remote-producer-peer-id: Producer peer_id (required in consumer role!). + """ + parser.add_argument( + "--signaling-host", default="127.0.0.1", help="Gstreamer signaling host" + ) + parser.add_argument( + "--signaling-port", default=8443, help="Gstreamer signaling port" + ) + parser.add_argument("role", choices=["consumer", "producer"], help="Signaling role") + parser.add_argument("--name", default="aiortc-peer", help="peer name") + parser.add_argument( + "--remote-producer-peer-id", + type=str, + help="producer peer_id (in consumer role, either set this or remote-producer-peer-name!)", + ) + parser.add_argument( + "--remote-producer-peer-name", + type=str, + default="aiortc-peer", + help="producer peer_name (in consumer role, either set this or remote-producer-peer-id!)", + ) diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_import.py b/tests/test_import.py deleted file mode 100644 index df6ef81..0000000 --- a/tests/test_import.py +++ /dev/null @@ -1,3 +0,0 @@ -def test_import(): - from gst_signalling.gst_signalling import GstSignalling - from gst_signalling.aiortc_adapter import GstSignalingForAiortc