Skip to content

Commit

Permalink
enhancement #36: working exchanging of message. draft
Browse files Browse the repository at this point in the history
  • Loading branch information
FabienDanieau committed Jan 27, 2024
1 parent 268a217 commit 83e83f6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 39 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dev = black==23.3.0
pytest==7.3.1
coverage==7.2.5
mypy==1.0.0
pygobject-stubs==2.10.0

[options.entry_points]
console_scripts =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
import logging
import os

from aiortc import RTCDataChannel

from gst_signalling import GstSignallingConsumer
from gst_signalling.gst_abstract_role import GstSession
from gst_signalling.utils import find_producer_peer_id_by_name


def on_data_channel_message(data_channel, data) -> None:
logging.info(f"Message from DataChannel: {data}")
data_channel.send_string("pong")


def on_data_channel_callback(webrtc, data_channel):
data_channel.connect("on-message-string", on_data_channel_message)


def main(args: argparse.Namespace) -> None:
peer_id = find_producer_peer_id_by_name(
args.signaling_host, args.signaling_port, args.producer_name
Expand All @@ -22,22 +30,23 @@ def main(args: argparse.Namespace) -> None:
producer_peer_id=peer_id,
)

"""
@consumer.on("new_session") # type: ignore[misc]
def on_new_session(session: GstSession) -> None:
pc = session.pc
pc.connect("on-data-channel", on_data_channel_callback)
print("heeere")
"""
@pc.on("datachannel") # type: ignore[misc]
def on_datachannel(channel: RTCDataChannel) -> None:
@channel.on("message") # type: ignore[misc]
def on_message(message: str) -> None:
print("received message:", message)
channel.send("pong")
"""

@consumer.on("close_session") # type: ignore[misc]
def on_close_session(session: GstSession) -> None:
close_evt.set()
"""

async def run_consumer(consumer: GstSignallingConsumer) -> None:
await consumer.connect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import os
import time

import aiortc
from gi.repository import Gst

from gst_signalling import GstSignallingProducer
from gst_signalling.gst_abstract_role import GstSession


def on_data_channel_message(data_channel, data) -> None:
logging.info(f"Message from DataChannel: {data}")


def main(args: argparse.Namespace) -> None:
Expand All @@ -16,11 +21,36 @@ def main(args: argparse.Namespace) -> None:
name=args.name,
)

freq_hz = 10
FREQ_HZ = 1000

"""
@producer.on("new_session") # type: ignore[misc]
def on_new_session(session: GstSession) -> None:
print("heeere")

def on_open(channel: Gst.Element) -> None:
asyncio.run_coroutine_threadsafe(send_pings(channel), loop)

async def send_pings(channel: Gst.Element) -> None:
try:
t0 = time.time()

while True:
dt = time.time() - t0
channel.send_string(f"ping: {dt:.1f}s")
await asyncio.sleep(1.0 / FREQ_HZ)
except Exception as e:
logging.error(f"{e}")

pc = session.pc
data_channel = pc.emit("create-data-channel", "chat", None)
if data_channel:
# self.on_data_channel(data_channel, pc)
data_channel.connect("on-open", on_open)
data_channel.connect("on-message-string", on_data_channel_message)
else:
logging.error("Failed to create data channel")

"""
pc = session.pc
channel = pc.createDataChannel("chat")
Expand All @@ -43,7 +73,8 @@ async def send_pings() -> None:
@channel.on("open") # type: ignore[misc]
def on_open() -> None:
asyncio.ensure_future(send_pings())
"""
"""

# run event loop
loop = asyncio.get_event_loop()
try:
Expand Down
38 changes: 17 additions & 21 deletions src/gst_signalling/gst_abstract_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,38 +76,29 @@ def __del__(self) -> None:
Gst.deinit()

def make_send_sdp(self, sdp, type: str, session_id: str): # type: ignore[no-untyped-def]
self.logger.debug(f"send sdp {type} {sdp}")
# self.logger.debug(f"send sdp {type} {sdp}")
text = sdp.sdp.as_text()
msg = {"type": type, "sdp": text}
asyncio.run_coroutine_threadsafe(
self.send_sdp(session_id, msg), self._asyncloop
)

def send_ice_candidate_message(self, _, mlineindex, candidate): # type: ignore[no-untyped-def]
icemsg = {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}}
self.logger.debug(icemsg)
# loop = asyncio.new_event_loop()
# loop.run_until_complete(self.send_sdp(session_id, icemsg))
def send_ice_candidate_message(self, _, mlineindex, candidate, session_id: str): # type: ignore[no-untyped-def]
icemsg = {"candidate": candidate, "sdpMLineIndex": mlineindex}
# self.logger.debug(f"hello {icemsg} {session_id}")
asyncio.run_coroutine_threadsafe(
self.send_ice(session_id, icemsg), self._asyncloop
)

def init_webrtc(self, session_id: str): # type: ignore[no-untyped-def]
webrtc = Gst.ElementFactory.make("webrtcbin")
assert webrtc

# webrtc.set_property("bundle-policy", "max-bundle")
# webrtc.set_property("stun-server", "stun://stun.l.google.com:19302")
# webrtc.set_property(
# "turn-server",
# "turn://gstreamer:[email protected]:3478",
# )
webrtc.set_property("stun-server", None)
webrtc.set_property("turn-server", None)

# webrtc.connect("on-ice-candidate", lambda *args: self._ices.append(args))
# webrtc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id)
webrtc.connect("on-ice-candidate", self.send_ice_candidate_message)
# webrtc.connect(
# "notify::ice-gathering-state", self.on_ice_gathering_state_notify
# )
webrtc.set_property("bundle-policy", "max-bundle")
# webrtc.set_property("stun-server", None)
# webrtc.set_property("turn-server", None)

webrtc.connect("on-ice-candidate", self.send_ice_candidate_message, session_id)

self._pipeline.set_state(Gst.State.READY)
self._pipeline.add(webrtc)
Expand Down Expand Up @@ -143,6 +134,11 @@ async def peer_for_session(
) -> None:
self.logger.info(f"peer for session {session_id} {message}")

def handle_ice_message(self, webrtc, ice_msg) -> None: # type: ignore[no-untyped-def]
candidate = ice_msg["candidate"]
sdpmlineindex = ice_msg["sdpMLineIndex"]
webrtc.emit("add-ice-candidate", sdpmlineindex, candidate)

async def close_session(self, session_id: str) -> None:
self.logger.info("close session")
"""
Expand Down
8 changes: 5 additions & 3 deletions src/gst_signalling/gst_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ def on_answer_created(
) -> None:
assert promise.wait() == Gst.PromiseResult.REPLIED
reply = promise.get_reply()
answer = reply["answer"]
# answer = reply["answer"]
answer = reply.get_value("answer")
promise = Gst.Promise.new()
webrtc.emit("set-local-description", answer, promise)
promise.interrupt() # we don't care about the result, discard it
# self.send_sdp(answer)
self.logger.debug(f"here {answer}")
self.make_send_sdp(answer, "answer", session_id)

def on_offer_set(
Expand Down Expand Up @@ -85,5 +84,8 @@ async def peer_for_session(
else:
self.logger.error(f"SDP not properly formatted {message['sdp']}")

elif "ice" in message:
self.handle_ice_message(webrtc, message["ice"])

else:
self.logger.error(f"message not processed {message}")
32 changes: 26 additions & 6 deletions src/gst_signalling/gst_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def on_offer_created(self, promise: Gst.Promise, webrtc: Gst.Element, session_id
self.logger.debug(f"on offer created {promise} {webrtc} {session_id}")
assert promise.wait() == Gst.PromiseResult.REPLIED
reply = promise.get_reply()
offer = reply["offer"]
# offer = reply["offer"]
offer = reply.get_value("offer")

promise = Gst.Promise.new()
self.logger.info("Offer created, setting local description")
Expand All @@ -47,16 +48,34 @@ async def setup_session(self, session_id: str, peer_id: str) -> GstSession:
pc = session.pc
pc.connect("on-negotiation-needed", self.on_negotiation_needed, session_id)

pc.connect("on-data-channel", self.on_data_channel)

self._pipeline.set_state(Gst.State.PLAYING)

"""
data_channel = pc.emit("create-data-channel", "myLabel", None)
if data_channel:
self.on_data_channel(data_channel, pc)
else:
self.logger.error("Failed to create data channel")
"""
self.emit("new_session", session)

return session

def on_data_channel(self, webrtc, channel):
self.logger.info("data_channel created")
# Fonction de callback pour la gestion des messages du canal de données
def on_data_channel_message(self, data_channel, data, length, user_data) -> None:
self.logger.info(f"Message from DataChannel: {data}")

# Fonction de callback pour la gestion de l'état du canal de données
def on_data_channel_state_change(self, data_channel, user_data) -> None:
state = data_channel.get_state()
self.logger.info(f"DataChannel State Changed: {state}")

# Fonction de callback pour la création du canal de données
def on_data_channel(self, data_channel, webrtc) -> None:
self.logger.info("DataChannel created")
data_channel.connect("on-message-string", self.on_data_channel_message)
# data_channel.connect("on-state-change", self.on_data_channel_state_change)

async def peer_for_session(
self, session_id: str, message: Dict[str, Dict[str, str]]
Expand All @@ -82,6 +101,7 @@ async def peer_for_session(
self.logger.warning("producer should not receive the offer")
else:
self.logger.error(f"SDP not properly formatted {message['sdp']}")

elif "ice" in message:
self.handle_ice_message(webrtc, message["ice"])
else:
self.logger.error(f"message not processed {message}")

0 comments on commit 83e83f6

Please sign in to comment.