Skip to content

Commit

Permalink
Improved codecs handling (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
spietras authored Jun 26, 2024
1 parent 6866572 commit 46b6fce
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 37 deletions.
79 changes: 59 additions & 20 deletions plugins/python/customwhipserversrc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from gi.repository import GObject, Gst
import gi

gi.require_version("GObject", "2.0")
gi.require_version("Gst", "1.0")
gi.require_version("GstWebRTC", "1.0")

from gi.repository import GObject, Gst, GstWebRTC # noqa: E402


class CustomWhipServerSrc(Gst.Bin):
Expand All @@ -9,30 +15,47 @@ class CustomWhipServerSrc(Gst.Bin):
"radio-aktywne",
)

__gsttemplates__ = Gst.PadTemplate.new(
"audio_%u",
Gst.PadDirection.SRC,
Gst.PadPresence.SOMETIMES,
Gst.Caps.from_string("audio/x-raw(ANY); application/x-rtp; audio/x-opus"),
)
_min: int | None
_max: int | None
_whip: Gst.Element
_signaller: GObject.GInterface
_webrtcbin: Gst.Element | None
_agent: GstWebRTC.WebRTCICE | None

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

self._min = None
self._max = None
self._whip = self._setup_whip()
self._signaller = self._setup_signaller()
self._webrtcbin = None
self._agent = None

self._whip = Gst.ElementFactory.make("whipserversrc", "whip")
self._signaller = self._whip.get_property("signaller")
def _setup_whip(self) -> Gst.Element:
whip = Gst.ElementFactory.make_with_properties(
"whipserversrc",
["audio-codecs", "video-codecs"],
[Gst.ValueArray(["OPUS"]), Gst.ValueArray([])],
)

# Needed due to https://gitlab.gnome.org/GNOME/pygobject/-/issues/605
self._agent = None
self.add(whip)

templates = whip.get_pad_template_list()
for template in templates:
self.add_pad_template(template)

whip.connect("pad-added", self._on_pad_added)
whip.connect("pad-removed", self._on_pad_removed)

self.add(self._whip)
return whip

self._whip.connect("pad-added", self._on_pad_added)
self._whip.connect("pad-removed", self._on_pad_removed)
self._signaller.connect("webrtcbin-ready", self._on_webrtcbin_ready)
def _setup_signaller(self) -> Gst.Element:
signaller = self._whip.get_property("signaller")

signaller.connect("webrtcbin-ready", self._on_webrtcbin_ready)

return signaller

@GObject.Property(
type=str,
Expand Down Expand Up @@ -100,33 +123,49 @@ def max(self, value: int) -> None:
else:
self._max = value

@GObject.Property(
type=str,
nick="Codec",
blurb="Codec to use",
)
def codec(self) -> str:
codecs = self._whip.get_property("audio-codecs")
return codecs[0]

@codec.setter
def codec(self, value: str) -> None:
self._whip.set_property("audio-codecs", Gst.ValueArray([value]))

def _on_pad_added(
self, element: Gst.Element, pad: Gst.Pad, *args, **kwargs
) -> None:
"""Handle the pad-added signal from the whipserversrc element."""

ghost_pad = Gst.GhostPad.new(pad.get_name(), pad)
name = pad.get_name()
ghost_pad = Gst.GhostPad.new(name, pad)
self.add_pad(ghost_pad)

def _on_pad_removed(
self, element: Gst.Element, pad: Gst.Pad, *args, **kwargs
) -> None:
"""Handle the pad-removed signal from the whipserversrc element."""

ghost_pad = self.get_static_pad(pad.get_name())
name = pad.get_name()
ghost_pad = self.get_static_pad(name)
self.remove_pad(ghost_pad)

def _on_webrtcbin_ready(
self,
self_obj: Gst.Object,
signaller: Gst.Object,
peer_id: str,
webrtcbin: Gst.Element,
*args,
**kwargs
**kwargs,
) -> None:
"""Handle the webrtcbin-ready signal from the signaller."""

self._agent = webrtcbin.get_property("ice-agent")
self._webrtcbin = webrtcbin
self._agent = self._webrtcbin.get_property("ice-agent")

if self._min is not None:
self._agent.set_property("min-rtp-port", self._min)
Expand Down
11 changes: 10 additions & 1 deletion src/emipass/streaming/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from emipass.models.base import SerializableModel


class Codec(StrEnum):
OPUS = "opus"


class Format(StrEnum):
OGG = "ogg"

Expand Down Expand Up @@ -56,10 +60,15 @@ class Request(SerializableModel):
title="Request.STUN",
description="STUN server to use.",
)
codec: Codec = Field(
Codec.OPUS,
title="Request.Codec",
description="Codec of the media in the stream.",
)
format: Format = Field(
Format.OGG,
title="Request.Format",
description="Format of the output audio.",
description="Format of the media in the stream.",
)
srt: SRTServer = Field(
...,
Expand Down
39 changes: 25 additions & 14 deletions src/emipass/streaming/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pystreams.stream import Stream

from emipass.config.models import Config
from emipass.streaming.models import Format, SRTServer, STUNServer
from emipass.streaming.models import Codec, Format, SRTServer, STUNServer


class StreamRunner:
Expand All @@ -14,16 +14,23 @@ class StreamRunner:
def __init__(self, config: Config) -> None:
self._config = config

def _build_input_node(self, port: int, stun: STUNServer) -> GStreamerNode:
def _build_input_node(
self, port: int, stun: STUNServer, codec: Codec
) -> GStreamerNode:
"""Builds an input node."""

codecs = {
Codec.OPUS: "OPUS",
}

return GStreamerNode(
element="customwhipserversrc",
properties={
"address": f"http://{self._config.server.host}:{port}",
"stun": f"stun://{stun.host}:{stun.port}",
"min": self._config.server.ports.rtp.min,
"max": self._config.server.ports.rtp.max,
"codec": codecs[codec],
},
)

Expand All @@ -37,17 +44,19 @@ def _build_watchdog_node(self) -> GStreamerNode:
},
)

def _build_converter_node(self) -> GStreamerNode:
"""Builds a converter node."""
def _build_extractor_node(self, codec: Codec) -> GStreamerNode:
"""Builds an extractor node."""

return GStreamerNode(element="audioconvert")
match codec:
case Codec.OPUS:
return GStreamerNode(element="rtpopusdepay")

def _build_encoder_node(self, format: Format) -> GStreamerNode:
"""Builds an encoder node."""
def _build_parser_node(self, codec: Codec) -> GStreamerNode:
"""Builds a parser node."""

match format:
case Format.OGG:
return GStreamerNode(element="opusenc")
match codec:
case Codec.OPUS:
return GStreamerNode(element="opusparse")

def _build_muxer_node(self, format: Format) -> GStreamerNode:
"""Builds a muxer node."""
Expand All @@ -70,17 +79,18 @@ def _build_stream_metadata(
self,
port: int,
stun: STUNServer,
codec: Codec,
format: Format,
srt: SRTServer,
) -> GStreamerStreamMetadata:
"""Builds stream metadata."""

return GStreamerStreamMetadata(
nodes=[
self._build_input_node(port, stun),
self._build_input_node(port, stun, codec),
self._build_watchdog_node(),
self._build_converter_node(),
self._build_encoder_node(format),
self._build_extractor_node(codec),
self._build_parser_node(codec),
self._build_muxer_node(format),
self._build_output_node(srt),
]
Expand All @@ -95,10 +105,11 @@ async def run(
self,
port: int,
stun: STUNServer,
codec: Codec,
format: Format,
srt: SRTServer,
) -> Stream:
"""Run the stream."""

metadata = self._build_stream_metadata(port, stun, format, srt)
metadata = self._build_stream_metadata(port, stun, codec, format, srt)
return await self._run_stream(metadata)
13 changes: 11 additions & 2 deletions src/emipass/streaming/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@

from emipass.config.models import Config
from emipass.streaming.errors import NoPortsAvailableError
from emipass.streaming.models import Format, Request, Response, SRTServer, STUNServer
from emipass.streaming.models import (
Codec,
Format,
Request,
Response,
SRTServer,
STUNServer,
)
from emipass.streaming.runner import StreamRunner


Expand Down Expand Up @@ -62,6 +69,7 @@ async def _run(
self,
port: int,
stun: STUNServer,
codec: Codec,
format: Format,
srt: SRTServer,
) -> None:
Expand All @@ -71,6 +79,7 @@ async def _run(
stream = await runner.run(
port=port,
stun=stun,
codec=codec,
format=format,
srt=srt,
)
Expand All @@ -84,7 +93,7 @@ async def stream(self, request: Request) -> Response:
stun = request.stun or self._get_default_stun()

try:
await self._run(port, stun, request.format, request.srt)
await self._run(port, stun, request.codec, request.format, request.srt)

return Response(port=port, stun=stun)
except Exception:
Expand Down

0 comments on commit 46b6fce

Please sign in to comment.