diff --git a/ocp_pipeline/opm.py b/ocp_pipeline/opm.py index 370f904..bfcd932 100644 --- a/ocp_pipeline/opm.py +++ b/ocp_pipeline/opm.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from os.path import join, dirname from threading import RLock -from typing import Tuple, Optional, Dict, List, Union +from typing import Tuple, Optional, Dict, List, Union, Any from langcodes import closest_match from ovos_bus_client.apis.ocp import ClassicAudioServiceInterface @@ -38,6 +38,11 @@ class OCPPlayerProxy: media_type: MediaType = MediaType.GENERIC +# for easier typing +RawResultsList = List[Union[MediaEntry, Playlist, PluginStream, Dict[str, Any]]] +NormalizedResultsList = List[Union[MediaEntry, Playlist, PluginStream]] + + class OCPPipelineMatcher(ConfidenceMatcherPipeline, OVOSAbstractApplication): intents = ["play.intent", "open.intent", "media_stop.intent", "next.intent", "prev.intent", "pause.intent", "play_favorites.intent", @@ -145,7 +150,6 @@ def register_ocp_api_events(self): @classmethod def load_intent_files(cls): intent_files = cls.load_resource_files() - for lang, intent_data in intent_files.items(): lang = standardize_lang_tag(lang) cls.intent_matchers[lang] = IntentContainer() @@ -536,17 +540,26 @@ def handle_play_intent(self, message: Message): results = [r for r in results if r.as_dict != best.as_dict] results.insert(0, best) self.set_context("Playing", origin=OCP_ID) + try: + # ovos-PHAL-plugin-mk1 will display music icon in response to play message + player = self.get_player(message) + LOG.debug(f"OCP player: {player}") + if not player.ocp_available: + LOG.debug(f"OCP legacy play: {results}") + self.legacy_play(results, query, message=message) + else: + LOG.debug(f"OCP play: {results}") + self.ocp_api.play(tracks=[best], utterance=query, source_message=message) - # ovos-PHAL-plugin-mk1 will display music icon in response to play message - player = self.get_player(message) - if not player.ocp_available: - self.legacy_play(results, query, message=message) - else: - self.ocp_api.play(tracks=[best], utterance=query, source_message=message) - self.ocp_api.populate_search_results(tracks=results, - replace=True, - sort_by_conf=False, # already sorted - source_message=message) + LOG.debug(f"OCP populate results: {results}") + self.ocp_api.populate_search_results(tracks=results, + replace=True, + sort_by_conf=False, # already sorted + source_message=message) + except Exception as e: + LOG.exception(f"ERROR: {e}") + + LOG.debug(f"OCP handled: {query}") def handle_open_intent(self, message: Message): LOG.info("Requesting OCP homescreen") @@ -756,6 +769,29 @@ def _should_resume(self, phrase: str, lang: str, message: Optional[Message] = No return False # search + def _player_sync(self, player: OCPPlayerProxy, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy: + + if not self.config.get("legacy"): # force legacy audio in config + ev = threading.Event() + + def handle_m(m): + nonlocal player + s = SessionManager.get(m) + if s.session_id == player.session_id: + player.available_extractors = m.data["SEI"] + player.ocp_available = True + ev.set() + LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}") + + self.bus.on("ovos.common_play.SEI.get.response", handle_m) + message = message or dig_for_message() or Message("") # get message.context to forward + self.bus.emit(message.forward("ovos.common_play.SEI.get")) + ev.wait(timeout) + self.bus.remove("ovos.common_play.SEI.get.response", handle_m) + + self.update_player_proxy(player) + return player + def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy: """get a PlayerProxy object, containing info such as player state and the available stream extractors from OCP this is tracked per Session, if needed requests the info from the client""" @@ -764,27 +800,15 @@ def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerP player = OCPPlayerProxy(available_extractors=available_extractors(), ocp_available=False, session_id=sess.session_id) - if not self.config.get("legacy"): # force legacy audio in config - ev = threading.Event() - - def handle_m(m): - s = SessionManager.get(m) - if s.session_id == player.session_id: - player.available_extractors = m.data["SEI"] - player.ocp_available = True - ev.set() - LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}") - - self.bus.on("ovos.common_play.SEI.get.response", handle_m) - message = message or dig_for_message() or Message("") # get message.context to forward - self.bus.emit(message.forward("ovos.common_play.SEI.get")) - ev.wait(timeout) - self.bus.remove("ovos.common_play.SEI.get.response", handle_m) - self.update_player_proxy(player) - - return self.ocp_sessions[sess.session_id] + else: + player = self.ocp_sessions[sess.session_id] + if not player.ocp_available and not self.config.get("legacy"): + # OCP might have loaded meanwhile + player = self._player_sync(player, message, timeout) + return player - def normalize_results(self, results: list) -> List[Union[MediaEntry, Playlist, PluginStream]]: + @staticmethod + def normalize_results(results: RawResultsList) -> NormalizedResultsList: # support Playlist and MediaEntry objects in tracks for idx, track in enumerate(results): if isinstance(track, dict): @@ -975,29 +999,44 @@ def select_best(results: list, message: Message) -> Union[MediaEntry, Playlist, ################## # Legacy Audio subsystem API - def legacy_play(self, results: List[Union[MediaEntry, Playlist, PluginStream]], phrase="", + def legacy_play(self, results: NormalizedResultsList, phrase="", message: Optional[Message] = None): - res = [] - for r in results: + player = self.get_player(message) + player.media_state = MediaState.LOADING_MEDIA + playing = False + for idx, r in enumerate(results): + LOG.debug(f"result idx: {idx}") + real_uri = None if not (r.playback == PlaybackType.AUDIO or r.media_type in OCPQuery.cast2audio): # we need to filter video results + LOG.debug(f"skipping: {r}") continue if isinstance(r, Playlist): # get internal entries from the playlist - for e in r.entries: - res.append(e.uri) + real_uri = [e.uri for e in r.entries] elif isinstance(r, MediaEntry): - res.append(r.uri) + real_uri = r.uri elif isinstance(r, PluginStream): # for legacy audio service we need to do stream extraction here - res.append(r.extract_uri(video=False)) - - self.legacy_api.play(res, utterance=phrase, source_message=message) + LOG.debug(f"extracting uri: {r.stream}") + # TODO - apparently it can hang here forever ??? + # happens with https://www.cbc.ca/podcasting/includes/hourlynews.xml from news skill + try: + real_uri = r.extract_uri(video=False) + except Exception as e: + LOG.exception(f"extraction failed: {r}") + if not real_uri: + continue + if not playing: + playing = True + LOG.debug(f"do play: {real_uri}") + self.legacy_api.play(real_uri, utterance=phrase, source_message=message) + player.player_state = PlayerState.PLAYING + self.update_player_proxy(player) + else: + LOG.debug(f"queue next: {real_uri}") + self.legacy_api.queue(real_uri, source_message=message) - player = self.get_player(message) - player.player_state = PlayerState.PLAYING - player.media_state = MediaState.LOADING_MEDIA - self.update_player_proxy(player) def _handle_legacy_audio_stop(self, message: Message): player = self.get_player(message)