diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index d2b3cd1..6d0d828 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -12,9 +12,10 @@ # import time from datetime import timedelta +from functools import wraps from os.path import abspath from threading import Lock -from typing import List, Union +from typing import List, Union, Optional from ovos_utils.gui import is_gui_connected, is_gui_running from ovos_utils.log import LOG, deprecated @@ -48,6 +49,32 @@ def ensure_uri(s: str): raise ValueError('Invalid track') +def _ensure_message_kwarg(): + """ensure message kwarg is present + NOTE: this is meant for usage only in this module, it is not a generic decorator! + """ + + def message_injector(func): + # this method ensures all skills messages are .forward from the utterance + # that triggered the skill, this ensures proper routing and metadata in message.context + @wraps(func) + def call_function(*args, **kwargs): + if not any([isinstance(a, Message) for a in args]): + m = kwargs.get("source_message") + if not m: + source_message = dig_for_message(max_records=50) + if source_message: + kwargs["source_message"] = source_message + else: + LOG.warning("source message could not be determined, message.context has been lost!") + kwargs["source_message"] = Message("") + return func(*args, **kwargs) + + return call_function + + return message_injector + + class ClassicAudioServiceInterface: """AudioService class for interacting with the classic mycroft audio subsystem @@ -68,13 +95,15 @@ class ClassicAudioServiceInterface: def __init__(self, bus=None): self.bus = bus or get_mycroft_bus() - def queue(self, tracks=None): + @_ensure_message_kwarg() + def queue(self, tracks=None, source_message: Optional[Message] = None): """Queue up a track to playing playlist. Args: tracks: track uri or list of track uri's Each track can be added as a tuple with (uri, mime) to give a hint of the mime type to the system + source_message: bus message that triggered this action """ tracks = tracks or [] if isinstance(tracks, (str, tuple)): @@ -82,10 +111,11 @@ def queue(self, tracks=None): elif not isinstance(tracks, list): raise ValueError tracks = [ensure_uri(t) for t in tracks] - self.bus.emit(Message('mycroft.audio.service.queue', - data={'tracks': tracks})) + self.bus.emit(source_message.forward('mycroft.audio.service.queue', + {'tracks': tracks})) - def play(self, tracks=None, utterance=None, repeat=None): + @_ensure_message_kwarg() + def play(self, tracks=None, utterance=None, repeat=None, source_message: Optional[Message] = None): """Start playback. Args: @@ -95,6 +125,7 @@ def play(self, tracks=None, utterance=None, repeat=None): utterance: forward utterance for further processing by the audio service. repeat: if the playback should be looped + source_message: bus message that triggered this action """ repeat = repeat or False tracks = tracks or [] @@ -104,119 +135,156 @@ def play(self, tracks=None, utterance=None, repeat=None): elif not isinstance(tracks, list): raise ValueError tracks = [ensure_uri(t) for t in tracks] - self.bus.emit(Message('mycroft.audio.service.play', - data={'tracks': tracks, - 'utterance': utterance, - 'repeat': repeat})) - - def stop(self): - """Stop the track.""" - self.bus.emit(Message('mycroft.audio.service.stop')) + self.bus.emit(source_message.forward('mycroft.audio.service.play', + {'tracks': tracks, + 'utterance': utterance, + 'repeat': repeat})) + + @_ensure_message_kwarg() + def stop(self, source_message: Optional[Message] = None): + """Stop the track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward('mycroft.audio.service.stop')) - def next(self): - """Change to next track.""" - self.bus.emit(Message('mycroft.audio.service.next')) + @_ensure_message_kwarg() + def next(self, source_message: Optional[Message] = None): + """Change to next track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward('mycroft.audio.service.next')) - def prev(self): - """Change to previous track.""" - self.bus.emit(Message('mycroft.audio.service.prev')) + @_ensure_message_kwarg() + def prev(self, source_message: Optional[Message] = None): + """Change to previous track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward('mycroft.audio.service.prev')) - def pause(self): - """Pause playback.""" - self.bus.emit(Message('mycroft.audio.service.pause')) + @_ensure_message_kwarg() + def pause(self, source_message: Optional[Message] = None): + """Pause playback. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward('mycroft.audio.service.pause')) - def resume(self): - """Resume paused playback.""" - self.bus.emit(Message('mycroft.audio.service.resume')) + @_ensure_message_kwarg() + def resume(self, source_message: Optional[Message] = None): + """Resume paused playback. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward('mycroft.audio.service.resume')) - def get_track_length(self): + @_ensure_message_kwarg() + def get_track_length(self, source_message: Optional[Message] = None): """ getting the duration of the audio in seconds + Args: + source_message: bus message that triggered this action """ length = 0 info = self.bus.wait_for_response( - Message('mycroft.audio.service.get_track_length'), + source_message.forward('mycroft.audio.service.get_track_length'), timeout=1) if info: length = info.data.get("length") or 0 return length / 1000 # convert to seconds - def get_track_position(self): + @_ensure_message_kwarg() + def get_track_position(self, source_message: Optional[Message] = None): """ get current position in seconds + Args: + source_message: bus message that triggered this action """ pos = 0 info = self.bus.wait_for_response( - Message('mycroft.audio.service.get_track_position'), + source_message.forward('mycroft.audio.service.get_track_position'), timeout=1) if info: pos = info.data.get("position") or 0 return pos / 1000 # convert to seconds - def set_track_position(self, seconds): + @_ensure_message_kwarg() + def set_track_position(self, seconds, source_message: Optional[Message] = None): """Seek X seconds. Arguments: seconds (int): number of seconds to seek, if negative rewind + source_message: bus message that triggered this action """ - self.bus.emit(Message('mycroft.audio.service.set_track_position', - {"position": seconds * 1000})) # convert to ms + self.bus.emit(source_message.forward('mycroft.audio.service.set_track_position', + {"position": seconds * 1000})) # convert to ms - def seek(self, seconds: Union[int, float, timedelta] = 1): + @_ensure_message_kwarg() + def seek(self, seconds: Union[int, float, timedelta] = 1, + source_message: Optional[Message] = None): """Seek X seconds. Args: seconds (int): number of seconds to seek, if negative rewind + source_message: bus message that triggered this action """ if isinstance(seconds, timedelta): seconds = seconds.total_seconds() if seconds < 0: - self.seek_backward(abs(seconds)) + self.seek_backward(abs(seconds), source_message=source_message) else: - self.seek_forward(seconds) + self.seek_forward(seconds, source_message=source_message) - def seek_forward(self, seconds: Union[int, float, timedelta] = 1): + @_ensure_message_kwarg() + def seek_forward(self, seconds: Union[int, float, timedelta] = 1, + source_message: Optional[Message] = None): """Skip ahead X seconds. Args: seconds (int): number of seconds to skip + source_message: bus message that triggered this action """ if isinstance(seconds, timedelta): seconds = seconds.total_seconds() - self.bus.emit(Message('mycroft.audio.service.seek_forward', - {"seconds": seconds})) + self.bus.emit(source_message.forward('mycroft.audio.service.seek_forward', + {"seconds": seconds})) - def seek_backward(self, seconds: Union[int, float, timedelta] = 1): + @_ensure_message_kwarg() + def seek_backward(self, seconds: Union[int, float, timedelta] = 1, source_message: Optional[Message] = None): """Rewind X seconds Args: seconds (int): number of seconds to rewind + source_message: bus message that triggered this action """ if isinstance(seconds, timedelta): seconds = seconds.total_seconds() - self.bus.emit(Message('mycroft.audio.service.seek_backward', - {"seconds": seconds})) + self.bus.emit(source_message.forward('mycroft.audio.service.seek_backward', + {"seconds": seconds})) - def track_info(self): + @_ensure_message_kwarg() + def track_info(self, source_message: Optional[Message] = None): """Request information of current playing track. + Args: + source_message: bus message that triggered this action Returns: Dict with track info. """ info = self.bus.wait_for_response( - Message('mycroft.audio.service.track_info'), + source_message.forward('mycroft.audio.service.track_info'), reply_type='mycroft.audio.service.track_info_reply', timeout=1) return info.data if info else {} - def available_backends(self): + @_ensure_message_kwarg() + def available_backends(self, source_message: Optional[Message] = None): """Return available audio backends. + Args: + source_message: bus message that triggered this action Returns: dict with backend names as keys """ - msg = Message('mycroft.audio.service.list_backends') - response = self.bus.wait_for_response(msg) + m = source_message.forward('mycroft.audio.service.list_backends') + response = self.bus.wait_for_response(m) return response.data if response else {} @property @@ -234,21 +302,6 @@ class OCPInterface: def __init__(self, bus=None): self.bus = bus or get_mycroft_bus() - def _format_msg(self, msg_type, msg_data=None): - # this method ensures all skills are .forward from the utterance - # that triggered the skill, this ensures proper routing and metadata - msg_data = msg_data or {} - msg = dig_for_message() - if msg: - msg = msg.forward(msg_type, msg_data) - else: - msg = Message(msg_type, msg_data) - # at this stage source == skills, lets indicate audio service took over - sauce = msg.context.get("source") - if sauce == "skills": - msg.context["source"] = "audio_service" - return msg - # OCP bus api @staticmethod def norm_tracks(tracks: list): @@ -279,130 +332,341 @@ def norm_tracks(tracks: list): assert all(isinstance(t, (MediaEntry, Playlist, PluginStream)) for t in tracks) return tracks - def queue(self, tracks: list): + @_ensure_message_kwarg() + def queue(self, tracks: list, source_message: Optional[Message] = None): """Queue up a track to OCP playing playlist. Args: tracks: track dict or list of track dicts (OCP result style) + source_message: bus message that triggered this action """ tracks = self.norm_tracks(tracks) - msg = self._format_msg('ovos.common_play.playlist.queue', - {'tracks': tracks}) - self.bus.emit(msg) + self.bus.emit(source_message.forward('ovos.common_play.playlist.queue', + {'tracks': tracks})) - def play(self, tracks: list, utterance=None): + @_ensure_message_kwarg() + def play(self, tracks: list, utterance=None, source_message: Optional[Message] = None): """Start playback. Args: tracks: track dict or list of track dicts (OCP result style) utterance: forward utterance for further processing by OCP + source_message: bus message that triggered this action """ tracks = self.norm_tracks(tracks) utterance = utterance or '' tracks = [t.as_dict for t in tracks] - msg = self._format_msg('ovos.common_play.play', - {"media": tracks[0], - "playlist": tracks, - "utterance": utterance}) - self.bus.emit(msg) - - def stop(self): - """Stop the track.""" - msg = self._format_msg("ovos.common_play.stop") - self.bus.emit(msg) + self.bus.emit(source_message.forward('ovos.common_play.play', + {"media": tracks[0], + "playlist": tracks, + "utterance": utterance})) + + @_ensure_message_kwarg() + def stop(self, source_message: Optional[Message] = None): + """Stop the track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward("ovos.common_play.stop")) - def next(self): - """Change to next track.""" - msg = self._format_msg("ovos.common_play.next") - self.bus.emit(msg) + @_ensure_message_kwarg() + def next(self, source_message: Optional[Message] = None): + """Change to next track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward("ovos.common_play.next")) - def prev(self): - """Change to previous track.""" - msg = self._format_msg("ovos.common_play.previous") - self.bus.emit(msg) + @_ensure_message_kwarg() + def prev(self, source_message: Optional[Message] = None): + """Change to previous track. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward("ovos.common_play.previous")) - def pause(self): - """Pause playback.""" - msg = self._format_msg("ovos.common_play.pause") - self.bus.emit(msg) + @_ensure_message_kwarg() + def pause(self, source_message: Optional[Message] = None): + """Pause playback. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward("ovos.common_play.pause")) - def resume(self): - """Resume paused playback.""" - msg = self._format_msg("ovos.common_play.resume") - self.bus.emit(msg) + @_ensure_message_kwarg() + def resume(self, source_message: Optional[Message] = None): + """Resume paused playback. + Args: + source_message: bus message that triggered this action""" + self.bus.emit(source_message.forward("ovos.common_play.resume")) - def seek_forward(self, seconds=1): + @_ensure_message_kwarg() + def seek_forward(self, seconds=1, source_message: Optional[Message] = None): """Skip ahead X seconds. Args: seconds (int): number of seconds to skip + source_message: bus message that triggered this action """ if isinstance(seconds, timedelta): seconds = seconds.total_seconds() - msg = self._format_msg('ovos.common_play.seek', - {"seconds": seconds}) - self.bus.emit(msg) + self.bus.emit(source_message.forward('ovos.common_play.seek', + {"seconds": seconds})) - def seek_backward(self, seconds=1): + @_ensure_message_kwarg() + def seek_backward(self, seconds=1, source_message: Optional[Message] = None): """Rewind X seconds Args: seconds (int): number of seconds to rewind + source_message: bus message that triggered this action """ if isinstance(seconds, timedelta): seconds = seconds.total_seconds() - msg = self._format_msg('ovos.common_play.seek', - {"seconds": seconds * -1}) - self.bus.emit(msg) + self.bus.emit(source_message.forward('ovos.common_play.seek', + {"seconds": seconds * -1})) - def get_track_length(self): + @_ensure_message_kwarg() + def get_track_length(self, source_message: Optional[Message] = None): """ getting the duration of the audio in miliseconds + Args: + source_message: bus message that triggered this action """ length = 0 - msg = self._format_msg('ovos.common_play.get_track_length') + msg = source_message.forward('ovos.common_play.get_track_length') info = self.bus.wait_for_response(msg, timeout=1) if info: length = info.data.get("length", 0) return length - def get_track_position(self): + @_ensure_message_kwarg() + def get_track_position(self, source_message: Optional[Message] = None): """ get current position in miliseconds + Args: + source_message: bus message that triggered this action """ pos = 0 - msg = self._format_msg('ovos.common_play.get_track_position') + msg = source_message.forward('ovos.common_play.get_track_position') info = self.bus.wait_for_response(msg, timeout=1) if info: pos = info.data.get("position", 0) return pos - def set_track_position(self, miliseconds): + @_ensure_message_kwarg() + def set_track_position(self, miliseconds, source_message: Optional[Message] = None): """Go to X position. Arguments: - miliseconds (int): position to go to in miliseconds + miliseconds (int): position to go to in miliseconds + source_message: bus message that triggered this action """ - msg = self._format_msg('ovos.common_play.set_track_position', - {"position": miliseconds}) - self.bus.emit(msg) + self.bus.emit(source_message.forward('ovos.common_play.set_track_position', + {"position": miliseconds})) - def track_info(self): + @_ensure_message_kwarg() + def track_info(self, source_message: Optional[Message] = None): """Request information of current playing track. + Args: + source_message: bus message that triggered this action Returns: Dict with track info. """ - msg = self._format_msg('ovos.common_play.track_info') + msg = source_message.forward('ovos.common_play.track_info') response = self.bus.wait_for_response(msg) return response.data if response else {} - def available_backends(self): + @_ensure_message_kwarg() + def available_backends(self, source_message: Optional[Message] = None): """Return available audio backends. + Args: + source_message: bus message that triggered this action Returns: dict with backend names as keys """ - msg = self._format_msg('ovos.common_play.list_backends') + msg = source_message.forward('ovos.common_play.list_backends') response = self.bus.wait_for_response(msg) return response.data if response else {} +class OCPQuery: + try: + from ovos_utils.ocp import MediaType + cast2audio = [ + MediaType.MUSIC, + MediaType.PODCAST, + MediaType.AUDIOBOOK, + MediaType.RADIO, + MediaType.RADIO_THEATRE, + MediaType.VISUAL_STORY, + MediaType.NEWS + ] + except ImportError as e: + from enum import IntEnum + + class MediaType(IntEnum): + GENERIC = 0 # nothing else matches + + cast2audio = None + + def __init__(self, query, bus, media_type=MediaType.GENERIC, config=None): + if self.cast2audio is None: + raise RuntimeError("This class requires ovos-utils ~=0.1") + LOG.debug(f"Created {media_type.name} query: {query}") + self.query = query + self.media_type = media_type + self.bus = bus + self.config = config or {} + self.reset() + + def reset(self): + try: + from ovos_utils.ocp import PlaybackMode + except ImportError as e: + raise RuntimeError("This method requires ovos-utils ~=0.1") from e + self.active_skills = {} + self.active_skills_lock = Lock() + self.query_replies = [] + self.searching = False + self.search_start = 0 + self.query_timeouts = self.config.get("min_timeout", 5) + if self.config.get("playback_mode") in [PlaybackMode.AUDIO_ONLY]: + self.has_gui = False + else: + self.has_gui = is_gui_running() or is_gui_connected(self.bus) + + def send(self, skill_id: str = None): + self.query_replies = [] + self.query_timeouts = self.config.get("min_timeout", 5) + self.search_start = time.time() + self.searching = True + self.register_events() + if skill_id: + self.bus.emit(Message(f'ovos.common_play.query.{skill_id}', + {"phrase": self.query, + "question_type": self.media_type})) + else: + self.bus.emit(Message('ovos.common_play.query', + {"phrase": self.query, + "question_type": self.media_type})) + + def wait(self): + try: + from ovos_utils.ocp import MediaType + except ImportError as e: + raise RuntimeError("This method requires ovos-utils ~=0.1") from e + # if there is no match type defined, lets increase timeout a bit + # since all skills need to search + if self.media_type == MediaType.GENERIC: + timeout = self.config.get("max_timeout", 15) + 3 # timeout bonus + else: + timeout = self.config.get("max_timeout", 15) + while self.searching and time.time() - self.search_start <= timeout: + time.sleep(0.1) + self.searching = False + self.remove_events() + + @property + def results(self) -> List[dict]: + return [s for s in self.query_replies if s.get("results")] + + def register_events(self): + LOG.debug("Registering Search Bus Events") + self.bus.on("ovos.common_play.skill.search_start", self.handle_skill_search_start) + self.bus.on("ovos.common_play.skill.search_end", self.handle_skill_search_end) + self.bus.on("ovos.common_play.query.response", self.handle_skill_response) + + def remove_events(self): + LOG.debug("Removing Search Bus Events") + self.bus.remove_all_listeners("ovos.common_play.skill.search_start") + self.bus.remove_all_listeners("ovos.common_play.skill.search_end") + self.bus.remove_all_listeners("ovos.common_play.query.response") + + def handle_skill_search_start(self, message): + skill_id = message.data["skill_id"] + LOG.debug(f"{message.data['skill_id']} is searching") + with self.active_skills_lock: + if skill_id not in self.active_skills: + self.active_skills[skill_id] = Lock() + + def handle_skill_response(self, message): + search_phrase = message.data["phrase"] + if search_phrase != self.query: + # not an answer for this search query + return + timeout = message.data.get("timeout") + skill_id = message.data['skill_id'] + # LOG.debug(f"OVOSCommonPlay result: {skill_id}") + + # in case this handler fires before the search start handler + with self.active_skills_lock: + if skill_id not in self.active_skills: + self.active_skills[skill_id] = Lock() + with self.active_skills[skill_id]: + if message.data.get("searching"): + # extend the timeout by N seconds + if timeout and self.config.get("allow_extensions", True): + self.query_timeouts += timeout + # else -> expired search + + else: + # Collect replies until the timeout + if not self.searching and not len(self.query_replies): + LOG.debug(" too late!! ignored in track selection process") + LOG.warning(f"{skill_id} is not answering fast enough!") + return + + # populate search playlist + res = message.data.get("results", []) + LOG.debug(f'got {len(res)} results from {skill_id}') + if res: + self.query_replies.append(message.data) + + # abort searching if we gathered enough results + # TODO ensure we have a decent confidence match, if all matches + # are < 50% conf extend timeout instead + if time.time() - self.search_start > self.query_timeouts: + if self.searching: + self.searching = False + LOG.debug("common play query timeout, parsing results") + + elif self.searching: + for res in message.data.get("results", []): + if res.get("match_confidence", 0) >= \ + self.config.get("early_stop_thresh", 85): + # got a really good match, dont search further + LOG.info( + "Receiving very high confidence match, stopping " + "search early") + + # allow other skills to "just miss" + early_stop_grace = \ + self.config.get("early_stop_grace_period", 0.5) + if early_stop_grace: + LOG.debug( + f" - grace period: {early_stop_grace} seconds") + time.sleep(early_stop_grace) + self.searching = False + return + + def handle_skill_search_end(self, message): + skill_id = message.data["skill_id"] + LOG.debug(f"{message.data['skill_id']} finished search") + with self.active_skills_lock: + if skill_id in self.active_skills: + with self.active_skills[skill_id]: + del self.active_skills[skill_id] + + # if this was the last skill end searching period + time.sleep(0.5) + # TODO this sleep is hacky, but avoids a race condition in + # case some skill just decides to respond before the others even + # acknowledge search is starting, this gives more than enough time + # for self.active_skills to be populated, a better approach should + # be employed but this works fine for now + if not self.active_skills and self.searching: + LOG.info("Received search responses from all skills!") + self.searching = False + + +########################################################## +# WIP ZONE - APIs below used for ovos-media + + class OCPAudioServiceInterface: """Internal OCP audio subsystem most likely you should use OCPInterface instead @@ -848,183 +1112,3 @@ def available_backends(self): def is_playing(self): """True if the webservice is playing, else False.""" return self.track_info() != {} - - -class OCPQuery: - try: - from ovos_utils.ocp import MediaType - cast2audio = [ - MediaType.MUSIC, - MediaType.PODCAST, - MediaType.AUDIOBOOK, - MediaType.RADIO, - MediaType.RADIO_THEATRE, - MediaType.VISUAL_STORY, - MediaType.NEWS - ] - except ImportError as e: - from enum import IntEnum - - class MediaType(IntEnum): - GENERIC = 0 # nothing else matches - - cast2audio = None - - def __init__(self, query, bus, media_type=MediaType.GENERIC, config=None): - if self.cast2audio is None: - raise RuntimeError("This class requires ovos-utils ~=0.1") - LOG.debug(f"Created {media_type.name} query: {query}") - self.query = query - self.media_type = media_type - self.bus = bus - self.config = config or {} - self.reset() - - def reset(self): - try: - from ovos_utils.ocp import PlaybackMode - except ImportError as e: - raise RuntimeError("This method requires ovos-utils ~=0.1") from e - self.active_skills = {} - self.active_skills_lock = Lock() - self.query_replies = [] - self.searching = False - self.search_start = 0 - self.query_timeouts = self.config.get("min_timeout", 5) - if self.config.get("playback_mode") in [PlaybackMode.AUDIO_ONLY]: - self.has_gui = False - else: - self.has_gui = is_gui_running() or is_gui_connected(self.bus) - - def send(self, skill_id: str = None): - self.query_replies = [] - self.query_timeouts = self.config.get("min_timeout", 5) - self.search_start = time.time() - self.searching = True - self.register_events() - if skill_id: - self.bus.emit(Message(f'ovos.common_play.query.{skill_id}', - {"phrase": self.query, - "question_type": self.media_type})) - else: - self.bus.emit(Message('ovos.common_play.query', - {"phrase": self.query, - "question_type": self.media_type})) - - def wait(self): - try: - from ovos_utils.ocp import MediaType - except ImportError as e: - raise RuntimeError("This method requires ovos-utils ~=0.1") from e - # if there is no match type defined, lets increase timeout a bit - # since all skills need to search - if self.media_type == MediaType.GENERIC: - timeout = self.config.get("max_timeout", 15) + 3 # timeout bonus - else: - timeout = self.config.get("max_timeout", 15) - while self.searching and time.time() - self.search_start <= timeout: - time.sleep(0.1) - self.searching = False - self.remove_events() - - @property - def results(self) -> List[dict]: - return [s for s in self.query_replies if s.get("results")] - - def register_events(self): - LOG.debug("Registering Search Bus Events") - self.bus.on("ovos.common_play.skill.search_start", self.handle_skill_search_start) - self.bus.on("ovos.common_play.skill.search_end", self.handle_skill_search_end) - self.bus.on("ovos.common_play.query.response", self.handle_skill_response) - - def remove_events(self): - LOG.debug("Removing Search Bus Events") - self.bus.remove_all_listeners("ovos.common_play.skill.search_start") - self.bus.remove_all_listeners("ovos.common_play.skill.search_end") - self.bus.remove_all_listeners("ovos.common_play.query.response") - - def handle_skill_search_start(self, message): - skill_id = message.data["skill_id"] - LOG.debug(f"{message.data['skill_id']} is searching") - with self.active_skills_lock: - if skill_id not in self.active_skills: - self.active_skills[skill_id] = Lock() - - def handle_skill_response(self, message): - search_phrase = message.data["phrase"] - if search_phrase != self.query: - # not an answer for this search query - return - timeout = message.data.get("timeout") - skill_id = message.data['skill_id'] - # LOG.debug(f"OVOSCommonPlay result: {skill_id}") - - # in case this handler fires before the search start handler - with self.active_skills_lock: - if skill_id not in self.active_skills: - self.active_skills[skill_id] = Lock() - with self.active_skills[skill_id]: - if message.data.get("searching"): - # extend the timeout by N seconds - if timeout and self.config.get("allow_extensions", True): - self.query_timeouts += timeout - # else -> expired search - - else: - # Collect replies until the timeout - if not self.searching and not len(self.query_replies): - LOG.debug(" too late!! ignored in track selection process") - LOG.warning(f"{skill_id} is not answering fast enough!") - return - - # populate search playlist - res = message.data.get("results", []) - LOG.debug(f'got {len(res)} results from {skill_id}') - if res: - self.query_replies.append(message.data) - - # abort searching if we gathered enough results - # TODO ensure we have a decent confidence match, if all matches - # are < 50% conf extend timeout instead - if time.time() - self.search_start > self.query_timeouts: - if self.searching: - self.searching = False - LOG.debug("common play query timeout, parsing results") - - elif self.searching: - for res in message.data.get("results", []): - if res.get("match_confidence", 0) >= \ - self.config.get("early_stop_thresh", 85): - # got a really good match, dont search further - LOG.info( - "Receiving very high confidence match, stopping " - "search early") - - # allow other skills to "just miss" - early_stop_grace = \ - self.config.get("early_stop_grace_period", 0.5) - if early_stop_grace: - LOG.debug( - f" - grace period: {early_stop_grace} seconds") - time.sleep(early_stop_grace) - self.searching = False - return - - def handle_skill_search_end(self, message): - skill_id = message.data["skill_id"] - LOG.debug(f"{message.data['skill_id']} finished search") - with self.active_skills_lock: - if skill_id in self.active_skills: - with self.active_skills[skill_id]: - del self.active_skills[skill_id] - - # if this was the last skill end searching period - time.sleep(0.5) - # TODO this sleep is hacky, but avoids a race condition in - # case some skill just decides to respond before the others even - # acknowledge search is starting, this gives more than enough time - # for self.active_skills to be populated, a better approach should - # be employed but this works fine for now - if not self.active_skills and self.searching: - LOG.info("Received search responses from all skills!") - self.searching = False