Skip to content

Commit

Permalink
enhancement #36: save draft code
Browse files Browse the repository at this point in the history
  • Loading branch information
FabienDanieau committed Jan 21, 2024
1 parent bac4403 commit 268a217
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 76 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ install_requires =
numpy
pyee
websockets
PyGObject==3.42.2

[options.packages.find]
where=src
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import argparse
import asyncio
import logging
import os

from aiortc import RTCDataChannel

from gst_signalling import GstSession, GstSignallingConsumer
from gst_signalling import GstSignallingConsumer
from gst_signalling.utils import find_producer_peer_id_by_name


Expand All @@ -21,6 +22,7 @@ def main(args: argparse.Namespace) -> None:
producer_peer_id=peer_id,
)

"""
@consumer.on("new_session") # type: ignore[misc]
def on_new_session(session: GstSession) -> None:
pc = session.pc
Expand All @@ -35,6 +37,7 @@ def on_message(message: str) -> None:
@consumer.on("close_session") # type: ignore[misc]
def on_close_session(session: GstSession) -> None:
close_evt.set()
"""

async def run_consumer(consumer: GstSignallingConsumer) -> None:
await consumer.connect()
Expand Down Expand Up @@ -68,5 +71,6 @@ async def run_consumer(consumer: GstSignallingConsumer) -> None:
logging.basicConfig(level=logging.INFO)
elif args.verbose > 1:
logging.basicConfig(level=logging.DEBUG)
os.environ["GST_DEBUG"] = "4"

main(args)
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import argparse
import asyncio
import logging
import os
import time

import aiortc

from gst_signalling import GstSession, GstSignallingProducer
from gst_signalling import GstSignallingProducer


def main(args: argparse.Namespace) -> None:
Expand All @@ -15,6 +16,9 @@ def main(args: argparse.Namespace) -> None:
name=args.name,
)

freq_hz = 10

"""
@producer.on("new_session") # type: ignore[misc]
def on_new_session(session: GstSession) -> None:
pc = session.pc
Expand All @@ -32,14 +36,14 @@ async def send_pings() -> None:
while True:
dt = time.time() - t0
channel.send(f"ping: {dt:.1f}s")
await asyncio.sleep(1)
await asyncio.sleep(1.0 / freq_hz)
except aiortc.exceptions.InvalidStateError:
print("Channel closed")
@channel.on("open") # type: ignore[misc]
def on_open() -> None:
asyncio.ensure_future(send_pings())

"""
# run event loop
loop = asyncio.get_event_loop()
try:
Expand All @@ -66,5 +70,6 @@ def on_open() -> None:
logging.basicConfig(level=logging.INFO)
elif args.verbose > 1:
logging.basicConfig(level=logging.DEBUG)
os.environ["GST_DEBUG"] = "4"

main(args)
3 changes: 2 additions & 1 deletion src/gst_signalling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .aiortc_adapter import GstSignalingForAiortc # noqa: F401
from .gst_abstract_role import GstSession # noqa: F401

# from .gst_abstract_role import GstSession # noqa: F401
from .gst_consumer import GstSignallingConsumer # noqa: F401
from .gst_listener import GstSignallingListener # noqa: F401
from .gst_producer import GstSignallingProducer # noqa: F401
126 changes: 67 additions & 59 deletions src/gst_signalling/gst_abstract_role.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
from aiortc import (
RTCIceCandidate,
RTCPeerConnection,
RTCSessionDescription,
)
from aiortc.contrib.signaling import object_from_string, object_to_string
import asyncio
import json
import logging
import pyee
from typing import Any, Dict, NamedTuple, Optional

from aiortc.sdp import candidate_from_sdp

import gi
import pyee

from .gst_signalling import GstSignalling

gi.require_version("Gst", "1.0")
gi.require_version("GstWebRTC", "1.0")

from gi.repository import Gst

GstSession = NamedTuple(
"GstSession",
[
("peer_id", str),
("pc", RTCPeerConnection),
("pc", Gst.Element), # type '__gi__.GstWebRTCBin'
],
)

Expand All @@ -39,6 +35,7 @@ def __init__(

self.peer_id: Optional[str] = None
self.peer_id_evt = asyncio.Event()
self._asyncloop = asyncio.get_event_loop()

self.sessions: Dict[str, GstSession] = {}

Expand Down Expand Up @@ -71,6 +68,52 @@ async def on_end_session(session_id: str) -> None:

self.signalling = signalling

Gst.init(None)

self._pipeline = Gst.Pipeline.new()

def __del__(self) -> None:
Gst.deinit()

def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def]
self.logger.debug(f"send sdp {type} {sdp}")
text = sdp.sdp.as_text()
msg = {"type": type, "sdp": text}
asyncio.run_coroutine_threadsafe(
self.send_sdp(session_id, msg), self._asyncloop
)

def send_ice_candidate_message(self, _, mlineindex, candidate): # type: ignore[no-untyped-def]
icemsg = {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}}
self.logger.debug(icemsg)
# loop = asyncio.new_event_loop()
# loop.run_until_complete(self.send_sdp(session_id, icemsg))

def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def]
webrtc = Gst.ElementFactory.make("webrtcbin")
assert webrtc

# webrtc.set_property("bundle-policy", "max-bundle")
# webrtc.set_property("stun-server", "stun://stun.l.google.com:19302")
# webrtc.set_property(
# "turn-server",
# "turn://gstreamer:[email protected]:3478",
# )
webrtc.set_property("stun-server", None)
webrtc.set_property("turn-server", None)

# webrtc.connect("on-ice-candidate", lambda *args: self._ices.append(args))
# webrtc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id)
webrtc.connect("on-ice-candidate", self.send_ice_candidate_message)
# webrtc.connect(
# "notify::ice-gathering-state", self.on_ice_gathering_state_notify
# )

self._pipeline.set_state(Gst.State.READY)
self._pipeline.add(webrtc)

return webrtc

async def connect(self) -> None:
assert self.signalling is not None

Expand All @@ -86,67 +129,32 @@ async def consume(self) -> None:

# Session management
async def setup_session(self, session_id: str, peer_id: str) -> GstSession:
pc = RTCPeerConnection()
self.logger.info("setup session")
pc = self.init_webrtc(session_id)

session = GstSession(peer_id, pc)
self.sessions[session_id] = session

self.emit("new_session", session)
self.sessions[session_id] = session

return session

async def peer_for_session(
self, session_id: str, message: Dict[str, Dict[str, Any]]
self, session_id: str, message: Dict[str, Dict[str, str]]
) -> None:
session = self.sessions[session_id]
pc = session.pc

if "sdp" in message:
obj = object_from_string(json.dumps(message["sdp"]))

if isinstance(obj, RTCSessionDescription):
if obj.type == "offer":
self.logger.info("Received offer sdp")
await pc.setRemoteDescription(obj)

self.logger.info("Sending answer")
await pc.setLocalDescription(await pc.createAnswer())

self.logger.info("Sending answer sdp")
await self.send_sdp(session_id, pc.localDescription)

elif obj.type == "answer":
self.logger.info("Received answer sdp")
await pc.setRemoteDescription(obj)

elif "ice" in message:
if "type" in message and str(message["type"]) == "candidate":
obj = object_from_string(json.dumps(message["ice"]))

if isinstance(obj, RTCIceCandidate):
self.logger.info(f"Received ice candidate {obj}")
await pc.addIceCandidate(obj)

else:
message = message["ice"]
if str(message["candidate"]) == "":
self.logger.info("Received empty candidate, ignoring")
return None

obj = candidate_from_sdp(str(message["candidate"]).split(":", 1)[1])
obj.sdpMLineIndex = message["sdpMLineIndex"]

self.logger.info(f"Received ice candidate {obj}")
await pc.addIceCandidate(obj)
self.logger.info(f"peer for session {session_id} {message}")

async def close_session(self, session_id: str) -> None:
self.logger.info("close session")
"""
session = self.sessions.pop(session_id)
self.emit("close_session", session)
await session.pc.close()
"""

async def send_sdp(self, session_id: str, sdp: RTCSessionDescription) -> None:
await self.signalling.send_peer_message(
session_id, "sdp", json.loads(object_to_string(sdp))
)
async def send_sdp(self, session_id: str, sdp: Dict[str, Dict[str, str]]) -> None:
await self.signalling.send_peer_message(session_id, "sdp", sdp)

async def send_ice(self, session_id: str, ice: Dict[str, Dict[str, str]]) -> None:
await self.signalling.send_peer_message(session_id, "ice", ice)
75 changes: 74 additions & 1 deletion src/gst_signalling/gst_consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
from .gst_abstract_role import GstSignallingAbstractRole
import logging
from typing import Dict

from gi.repository import Gst, GstSdp, GstWebRTC

from .gst_abstract_role import GstSession, GstSignallingAbstractRole

"""
try:
from gi.overrides import Gst as _
except ImportError:
print("gstreamer-python binding overrides aren't available, please install them")
raise
"""


class GstSignallingConsumer(GstSignallingAbstractRole):
Expand All @@ -9,8 +22,68 @@ def __init__(
producer_peer_id: str,
) -> None:
super().__init__(host, port)
self.logger = logging.getLogger(__name__)
self.producer_peer_id = producer_peer_id

async def connect(self) -> None:
await super().connect()
await self.signalling.start_session(self.producer_peer_id)
self.logger.info("connect")

async def setup_session(self, session_id: str, peer_id: str) -> GstSession:
session = await super().setup_session(session_id, peer_id)
self.logger.info("setup session consumer")

self._pipeline.set_state(Gst.State.PLAYING)
self.emit("new_session", session)

return session

def on_answer_created(
self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str
) -> None:
assert promise.wait() == Gst.PromiseResult.REPLIED
reply = promise.get_reply()
answer = reply["answer"]
promise = Gst.Promise.new()
webrtc.emit("set-local-description", answer, promise)
promise.interrupt() # we don't care about the result, discard it
# self.send_sdp(answer)
self.logger.debug(f"here {answer}")
self.make_send_sdp(answer, "answer", session_id)

def on_offer_set(
self, promise: Gst.Promise, webrtc: Gst.Element, session_id: str
) -> None:
assert promise.wait() == Gst.PromiseResult.REPLIED
promise = Gst.Promise.new_with_change_func(
self.on_answer_created, webrtc, session_id
)
webrtc.emit("create-answer", None, promise)

async def peer_for_session(
self, session_id: str, message: Dict[str, Dict[str, str]]
) -> None:
self.logger.info(f"peer for session {session_id} {message}")

session = self.sessions[session_id]
webrtc = session.pc

if "sdp" in message:
if message["sdp"]["type"] == "offer":
_, sdpmsg = GstSdp.SDPMessage.new_from_text(message["sdp"]["sdp"])
sdp_type = GstWebRTC.WebRTCSDPType.OFFER
offer = GstWebRTC.WebRTCSessionDescription.new(sdp_type, sdpmsg)
promise = Gst.Promise.new_with_change_func(
self.on_offer_set, webrtc, session_id
)
webrtc.emit("set-remote-description", offer, promise)
self.logger.debug("set remote desc done")

elif message["sdp"]["type"] == "answer":
self.logger.warning("Consumer should not receive the answer")
else:
self.logger.error(f"SDP not properly formatted {message['sdp']}")

else:
self.logger.error(f"message not processed {message}")
Loading

0 comments on commit 268a217

Please sign in to comment.