Skip to content

Commit

Permalink
improve OCP detection + speed up legacy playback start (#34)
Browse files Browse the repository at this point in the history
* fix: improve OCP detection + speed up legacy playback start

* improve OCP detection + speed up legacy playback start

* Update ocp_pipeline/opm.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
JarbasAl and coderabbitai[bot] authored Nov 22, 2024
1 parent d7c872f commit eb496b3
Showing 1 changed file with 63 additions and 35 deletions.
98 changes: 63 additions & 35 deletions ocp_pipeline/opm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from os.path import join, dirname
from threading import RLock
from typing import Tuple, Optional, Dict, List, Union
from typing import Tuple, Optional, Dict, List, Union, Any

from langcodes import closest_match
from ovos_bus_client.apis.ocp import ClassicAudioServiceInterface
Expand Down Expand Up @@ -38,6 +38,11 @@ class OCPPlayerProxy:
media_type: MediaType = MediaType.GENERIC


# for easier typing
RawResultsList = List[Union[MediaEntry, Playlist, PluginStream, Dict[str, Any]]]
NormalizedResultsList = List[Union[MediaEntry, Playlist, PluginStream]]


class OCPPipelineMatcher(ConfidenceMatcherPipeline, OVOSAbstractApplication):
intents = ["play.intent", "open.intent", "media_stop.intent",
"next.intent", "prev.intent", "pause.intent", "play_favorites.intent",
Expand Down Expand Up @@ -145,7 +150,6 @@ def register_ocp_api_events(self):
@classmethod
def load_intent_files(cls):
intent_files = cls.load_resource_files()

for lang, intent_data in intent_files.items():
lang = standardize_lang_tag(lang)
cls.intent_matchers[lang] = IntentContainer()
Expand Down Expand Up @@ -756,6 +760,31 @@ def _should_resume(self, phrase: str, lang: str, message: Optional[Message] = No
return False

# search
def _player_sync(self, player: OCPPlayerProxy, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:

if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
nonlocal player
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
self.update_player_proxy(player)
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)

if not ev.is_set():
LOG.warning(f"Player synchronization timed out after {timeout} seconds")

return player
def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:
"""get a PlayerProxy object, containing info such as player state and the available stream extractors from OCP
this is tracked per Session, if needed requests the info from the client"""
Expand All @@ -764,27 +793,15 @@ def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerP
player = OCPPlayerProxy(available_extractors=available_extractors(),
ocp_available=False,
session_id=sess.session_id)
if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)
self.update_player_proxy(player)

return self.ocp_sessions[sess.session_id]
else:
player = self.ocp_sessions[sess.session_id]
if not player.ocp_available and not self.config.get("legacy"):
# OCP might have loaded meanwhile
player = self._player_sync(player, message, timeout)
return player

def normalize_results(self, results: list) -> List[Union[MediaEntry, Playlist, PluginStream]]:
@staticmethod
def normalize_results(results: RawResultsList) -> NormalizedResultsList:
# support Playlist and MediaEntry objects in tracks
for idx, track in enumerate(results):
if isinstance(track, dict):
Expand Down Expand Up @@ -975,29 +992,40 @@ def select_best(results: list, message: Message) -> Union[MediaEntry, Playlist,

##################
# Legacy Audio subsystem API
def legacy_play(self, results: List[Union[MediaEntry, Playlist, PluginStream]], phrase="",
def legacy_play(self, results: NormalizedResultsList, phrase="",
message: Optional[Message] = None):
res = []
for r in results:
player = self.get_player(message)
player.media_state = MediaState.LOADING_MEDIA
playing = False
for idx, r in enumerate(results):
real_uri = None
if not (r.playback == PlaybackType.AUDIO or r.media_type in OCPQuery.cast2audio):
# we need to filter video results
continue
if isinstance(r, Playlist):
# get internal entries from the playlist
for e in r.entries:
res.append(e.uri)
real_uri = [e.uri for e in r.entries]
elif isinstance(r, MediaEntry):
res.append(r.uri)
real_uri = r.uri
elif isinstance(r, PluginStream):
# for legacy audio service we need to do stream extraction here
res.append(r.extract_uri(video=False))

self.legacy_api.play(res, utterance=phrase, source_message=message)
LOG.debug(f"extracting uri: {r.stream}")
# TODO - apparently it can hang here forever ???
# happens with https://www.cbc.ca/podcasting/includes/hourlynews.xml from news skill
try:
real_uri = r.extract_uri(video=False)
except Exception as e:
LOG.exception(f"extraction failed: {r}")
if not real_uri:
continue
if not playing:
playing = True
self.legacy_api.play(real_uri, utterance=phrase, source_message=message)
player.player_state = PlayerState.PLAYING
self.update_player_proxy(player)
else:
self.legacy_api.queue(real_uri, source_message=message)

player = self.get_player(message)
player.player_state = PlayerState.PLAYING
player.media_state = MediaState.LOADING_MEDIA
self.update_player_proxy(player)

def _handle_legacy_audio_stop(self, message: Message):
player = self.get_player(message)
Expand Down

0 comments on commit eb496b3

Please sign in to comment.