From 7bf470435ab93c9ab708887ea4f7d8c655c740be Mon Sep 17 00:00:00 2001 From: NeonJarbas <59943014+NeonJarbas@users.noreply.github.com> Date: Fri, 12 Jan 2024 01:02:22 +0000 Subject: [PATCH] feat/ovos-media (#78) * feat/OCP - add some missed methods in the OCPInterface class - add apis for ovos-media - support MediaEntry and Playlist objects in the apis * Update ocp.py * Update ocp.py * Update ocp.py * Update requirements.txt --------- Co-authored-by: JarbasAi Co-authored-by: JarbasAI <33701864+JarbasAl@users.noreply.github.com> --- .github/workflows/publish_alpha.yml | 2 +- .github/workflows/unit_tests.yml | 4 +- ovos_bus_client/apis/ocp.py | 624 +++++++++++++++++++++++----- requirements.txt | 2 +- 4 files changed, 522 insertions(+), 110 deletions(-) diff --git a/.github/workflows/publish_alpha.yml b/.github/workflows/publish_alpha.yml index a7fde29..86dcaca 100644 --- a/.github/workflows/publish_alpha.yml +++ b/.github/workflows/publish_alpha.yml @@ -14,7 +14,7 @@ on: - 'LICENSE' - 'CHANGELOG.md' - 'MANIFEST.in' - - 'readme.md' + - 'README.md' - 'scripts/**' workflow_dispatch: diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index e56a8cd..f151381 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -11,7 +11,7 @@ on: - 'LICENSE' - 'CHANGELOG.md' - 'MANIFEST.in' - - 'readme.md' + - 'README.md' - 'scripts/**' push: branches: @@ -25,7 +25,7 @@ on: - 'LICENSE' - 'CHANGELOG.md' - 'MANIFEST.in' - - 'readme.md' + - 'README.md' - 'scripts/**' workflow_dispatch: diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index c304fff..03c47a2 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -1,5 +1,3 @@ -# Copyright 2017 Mycroft AI Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -18,12 +16,13 @@ from threading import Lock from typing import List -from ovos_bus_client.message import Message, dig_for_message -from ovos_bus_client.util import get_mycroft_bus from ovos_utils.gui import is_gui_connected, is_gui_running -from ovos_utils.log import LOG -from ovos_utils.messagebus import Message -from ovos_utils.ocp import MediaType, PlaybackType, PlaybackMode +from ovos_utils.log import LOG, deprecated +from ovos_utils.ocp import MediaType, PlaybackMode, Playlist, MediaEntry + +from ovos_bus_client.message import Message +from ovos_bus_client.message import dig_for_message +from ovos_bus_client.util import get_mycroft_bus def ensure_uri(s: str): @@ -53,13 +52,18 @@ def ensure_uri(s: str): class ClassicAudioServiceInterface: """AudioService class for interacting with the audio subsystem - Audio is managed by OCP in the default implementation, - usually this class should not be directly used, see OCPInterface instead + DEPRECATED: only works in ovos-core <= 0.0.8 + + it has been removed from ovos-audio with the move to ovos-media + + use OCPInterface instead Args: - bus: Mycroft messagebus connection + bus: OpenVoiceOS messagebus connection """ + @deprecated("removed from ovos-audio with the adoption of ovos-media service, " + "use OCPInterface instead", "0.1.0") def __init__(self, bus=None): self.bus = bus or get_mycroft_bus() @@ -223,7 +227,7 @@ def is_playing(self): class OCPInterface: """bus api interface for OCP subsystem Args: - bus: Mycroft messagebus connection + bus: OpenVoiceOS messagebus connection """ def __init__(self, bus=None): @@ -245,31 +249,45 @@ def _format_msg(self, msg_type, msg_data=None): return msg # OCP bus api - def queue(self, tracks): + @staticmethod + def norm_tracks(tracks: list): + """ensures a list of tracks contains only MediaEntry or Playlist items""" + assert isinstance(tracks, list) + # support Playlist and MediaEntry objects in tracks + for idx, track in enumerate(tracks): + if isinstance(track, dict): + tracks[idx] = MediaEntry.from_dict(track) + elif isinstance(track, list) and not isinstance(track, Playlist): + tracks[idx] = OCPInterface.norm_tracks(tracks) + else: + # TODO - support string uris + # let it fail in next assert + # log all bad entries before failing + LOG.error(f"Bad track, invalid type: {track}") + assert all(isinstance(t, (MediaEntry, Playlist)) for t in tracks) + return tracks + + def queue(self, tracks: list): """Queue up a track to OCP playing playlist. Args: tracks: track dict or list of track dicts (OCP result style) """ - - assert isinstance(tracks, list) - assert all(isinstance(t, dict) for t in tracks) - + tracks = self.norm_tracks(tracks) msg = self._format_msg('ovos.common_play.playlist.queue', {'tracks': tracks}) self.bus.emit(msg) - def play(self, tracks, utterance=None): + def play(self, tracks: list, utterance=None): """Start playback. Args: tracks: track dict or list of track dicts (OCP result style) utterance: forward utterance for further processing by OCP """ - assert isinstance(tracks, list) - assert all(isinstance(t, dict) for t in tracks) + tracks = self.norm_tracks(tracks) utterance = utterance or '' - + tracks = [t.as_dict for t in tracks] msg = self._format_msg('ovos.common_play.play', {"media": tracks[0], "playlist": tracks, @@ -373,6 +391,453 @@ def available_backends(self): return response.data if response else {} +class OCPAudioServiceInterface: + """Internal OCP audio subsystem + most likely you should use OCPInterface instead + NOTE: this class operates with uris not with MediaEntry/Playlist/dict entries + """ + + def __init__(self, bus=None): + self.bus = bus or get_mycroft_bus() + + def play(self, tracks=None, utterance=None, repeat=None): + """Start playback. + + Args: + tracks: track uri or list of track uri's + Each track can be added as a tuple with (uri, mime) + to give a hint of the mime type to the system + utterance: forward utterance for further processing by the + audio service. + repeat: if the playback should be looped + """ + repeat = repeat or False + tracks = tracks or [] + utterance = utterance or '' + if isinstance(tracks, (str, tuple)): + tracks = [tracks] + elif not isinstance(tracks, list): + raise ValueError + tracks = [ensure_uri(t) for t in tracks] + self.bus.emit(Message('ovos.audio.service.play', + data={'tracks': tracks, + 'utterance': utterance, + 'repeat': repeat})) + + def stop(self): + """Stop the track.""" + self.bus.emit(Message('ovos.audio.service.stop')) + + def next(self): + """Change to next track.""" + self.bus.emit(Message('ovos.audio.service.next')) + + def prev(self): + """Change to previous track.""" + self.bus.emit(Message('ovos.audio.service.prev')) + + def pause(self): + """Pause playback.""" + self.bus.emit(Message('ovos.audio.service.pause')) + + def resume(self): + """Resume paused playback.""" + self.bus.emit(Message('ovos.audio.service.resume')) + + def get_track_length(self): + """ + getting the duration of the audio in seconds + """ + length = 0 + info = self.bus.wait_for_response( + Message('ovos.audio.service.get_track_length'), + timeout=1) + if info: + length = info.data.get("length") or 0 + return length / 1000 # convert to seconds + + def get_track_position(self): + """ + get current position in seconds + """ + pos = 0 + info = self.bus.wait_for_response( + Message('ovos.audio.service.get_track_position'), + timeout=1) + if info: + pos = info.data.get("position") or 0 + return pos / 1000 # convert to seconds + + def set_track_position(self, seconds): + """Seek X seconds. + + Arguments: + seconds (int): number of seconds to seek, if negative rewind + """ + self.bus.emit(Message('ovos.audio.service.set_track_position', + {"position": seconds * 1000})) # convert to ms + + def seek(self, seconds=1): + """Seek X seconds. + + Args: + seconds (int): number of seconds to seek, if negative rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + if seconds < 0: + self.seek_backward(abs(seconds)) + else: + self.seek_forward(seconds) + + def seek_forward(self, seconds=1): + """Skip ahead X seconds. + + Args: + seconds (int): number of seconds to skip + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.audio.service.seek_forward', + {"seconds": seconds})) + + def seek_backward(self, seconds=1): + """Rewind X seconds + + Args: + seconds (int): number of seconds to rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.audio.service.seek_backward', + {"seconds": seconds})) + + def track_info(self): + """Request information of current playing track. + + Returns: + Dict with track info. + """ + info = self.bus.wait_for_response( + Message('ovos.audio.service.track_info'), + reply_type='ovos.audio.service.track_info_reply', + timeout=1) + return info.data if info else {} + + def available_backends(self): + """Return available audio backends. + + Returns: + dict with backend names as keys + """ + msg = Message('ovos.audio.service.list_backends') + response = self.bus.wait_for_response(msg) + return response.data if response else {} + + @property + def is_playing(self): + """True if the audioservice is playing, else False.""" + return self.track_info() != {} + + +class OCPVideoServiceInterface: + """Internal OCP video subsystem + most likely you should use OCPInterface instead + NOTE: this class operates with uris not with MediaEntry/Playlist/dict entries + """ + + def __init__(self, bus=None): + self.bus = bus or get_mycroft_bus() + + def play(self, tracks=None, utterance=None, repeat=None): + """Start playback. + + Args: + tracks: track uri or list of track uri's + Each track can be added as a tuple with (uri, mime) + to give a hint of the mime type to the system + utterance: forward utterance for further processing by the + video service. + repeat: if the playback should be looped + """ + repeat = repeat or False + tracks = tracks or [] + utterance = utterance or '' + if isinstance(tracks, (str, tuple)): + tracks = [tracks] + elif not isinstance(tracks, list): + raise ValueError + tracks = [ensure_uri(t) for t in tracks] + self.bus.emit(Message('ovos.video.service.play', + data={'tracks': tracks, + 'utterance': utterance, + 'repeat': repeat})) + + def stop(self): + """Stop the track.""" + self.bus.emit(Message('ovos.video.service.stop')) + + def next(self): + """Change to next track.""" + self.bus.emit(Message('ovos.video.service.next')) + + def prev(self): + """Change to previous track.""" + self.bus.emit(Message('ovos.video.service.prev')) + + def pause(self): + """Pause playback.""" + self.bus.emit(Message('ovos.video.service.pause')) + + def resume(self): + """Resume paused playback.""" + self.bus.emit(Message('ovos.video.service.resume')) + + def get_track_length(self): + """ + getting the duration of the video in seconds + """ + length = 0 + info = self.bus.wait_for_response( + Message('ovos.video.service.get_track_length'), + timeout=1) + if info: + length = info.data.get("length") or 0 + return length / 1000 # convert to seconds + + def get_track_position(self): + """ + get current position in seconds + """ + pos = 0 + info = self.bus.wait_for_response( + Message('ovos.video.service.get_track_position'), + timeout=1) + if info: + pos = info.data.get("position") or 0 + return pos / 1000 # convert to seconds + + def set_track_position(self, seconds): + """Seek X seconds. + + Arguments: + seconds (int): number of seconds to seek, if negative rewind + """ + self.bus.emit(Message('ovos.video.service.set_track_position', + {"position": seconds * 1000})) # convert to ms + + def seek(self, seconds=1): + """Seek X seconds. + + Args: + seconds (int): number of seconds to seek, if negative rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + if seconds < 0: + self.seek_backward(abs(seconds)) + else: + self.seek_forward(seconds) + + def seek_forward(self, seconds=1): + """Skip ahead X seconds. + + Args: + seconds (int): number of seconds to skip + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.video.service.seek_forward', + {"seconds": seconds})) + + def seek_backward(self, seconds=1): + """Rewind X seconds + + Args: + seconds (int): number of seconds to rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.video.service.seek_backward', + {"seconds": seconds})) + + def track_info(self): + """Request information of current playing track. + + Returns: + Dict with track info. + """ + info = self.bus.wait_for_response( + Message('ovos.video.service.track_info'), + reply_type='ovos.video.service.track_info_reply', + timeout=1) + return info.data if info else {} + + def available_backends(self): + """Return available video backends. + + Returns: + dict with backend names as keys + """ + msg = Message('ovos.video.service.list_backends') + response = self.bus.wait_for_response(msg) + return response.data if response else {} + + @property + def is_playing(self): + """True if the videoservice is playing, else False.""" + return self.track_info() != {} + + +class OCPWebServiceInterface: + """Internal OCP web view subsystem + most likely you should use OCPInterface instead + NOTE: this class operates with uris not with MediaEntry/Playlist/dict entries + """ + + def __init__(self, bus=None): + self.bus = bus or get_mycroft_bus() + + def play(self, tracks=None, utterance=None, repeat=None): + """Start playback. + + Args: + tracks: track uri or list of track uri's + Each track can be added as a tuple with (uri, mime) + to give a hint of the mime type to the system + utterance: forward utterance for further processing by the + web service. + repeat: if the playback should be looped + """ + repeat = repeat or False + tracks = tracks or [] + utterance = utterance or '' + if isinstance(tracks, (str, tuple)): + tracks = [tracks] + elif not isinstance(tracks, list): + raise ValueError + tracks = [ensure_uri(t) for t in tracks] + self.bus.emit(Message('ovos.web.service.play', + data={'tracks': tracks, + 'utterance': utterance, + 'repeat': repeat})) + + def stop(self): + """Stop the track.""" + self.bus.emit(Message('ovos.web.service.stop')) + + def next(self): + """Change to next track.""" + self.bus.emit(Message('ovos.web.service.next')) + + def prev(self): + """Change to previous track.""" + self.bus.emit(Message('ovos.web.service.prev')) + + def pause(self): + """Pause playback.""" + self.bus.emit(Message('ovos.web.service.pause')) + + def resume(self): + """Resume paused playback.""" + self.bus.emit(Message('ovos.web.service.resume')) + + def get_track_length(self): + """ + getting the duration of the web in seconds + """ + length = 0 + info = self.bus.wait_for_response( + Message('ovos.web.service.get_track_length'), + timeout=1) + if info: + length = info.data.get("length") or 0 + return length / 1000 # convert to seconds + + def get_track_position(self): + """ + get current position in seconds + """ + pos = 0 + info = self.bus.wait_for_response( + Message('ovos.web.service.get_track_position'), + timeout=1) + if info: + pos = info.data.get("position") or 0 + return pos / 1000 # convert to seconds + + def set_track_position(self, seconds): + """Seek X seconds. + + Arguments: + seconds (int): number of seconds to seek, if negative rewind + """ + self.bus.emit(Message('ovos.web.service.set_track_position', + {"position": seconds * 1000})) # convert to ms + + def seek(self, seconds=1): + """Seek X seconds. + + Args: + seconds (int): number of seconds to seek, if negative rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + if seconds < 0: + self.seek_backward(abs(seconds)) + else: + self.seek_forward(seconds) + + def seek_forward(self, seconds=1): + """Skip ahead X seconds. + + Args: + seconds (int): number of seconds to skip + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.web.service.seek_forward', + {"seconds": seconds})) + + def seek_backward(self, seconds=1): + """Rewind X seconds + + Args: + seconds (int): number of seconds to rewind + """ + if isinstance(seconds, timedelta): + seconds = seconds.total_seconds() + self.bus.emit(Message('ovos.web.service.seek_backward', + {"seconds": seconds})) + + def track_info(self): + """Request information of current playing track. + + Returns: + Dict with track info. + """ + info = self.bus.wait_for_response( + Message('ovos.web.service.track_info'), + reply_type='ovos.web.service.track_info_reply', + timeout=1) + return info.data if info else {} + + def available_backends(self): + """Return available web backends. + + Returns: + dict with backend names as keys + """ + msg = Message('ovos.web.service.list_backends') + response = self.bus.wait_for_response(msg) + return response.data if response else {} + + @property + def is_playing(self): + """True if the webservice is playing, else False.""" + return self.track_info() != {} + + class OCPQuery: cast2audio = [ MediaType.MUSIC, @@ -392,18 +857,7 @@ def __init__(self, query, bus, media_type=MediaType.GENERIC, config=None): self.config = config or {} self.reset() - def _get_available_extractors(self): - # TODO - implement a bus api, - # in containers the plugins wont be installed - # in the code using this api - try: # optional import - from ovos_plugin_manager.ocp import available_extractors - self.valid_uris = available_extractors() - except: - self.valid_uris = ["/", "http:", "https:", "file:"] - def reset(self): - self._get_available_extractors() self.active_skills = {} self.active_skills_lock = Lock() self.query_replies = [] @@ -439,8 +893,7 @@ def wait(self): @property def results(self) -> List[dict]: - return [s for s in self.query_replies - if s.get("results")] + return [s for s in self.query_replies if s.get("results")] def register_events(self): LOG.debug("Registering Search Bus Events") @@ -485,82 +938,41 @@ def handle_skill_response(self, message): # Collect replies until the timeout if not self.searching and not len(self.query_replies): LOG.debug(" too late!! ignored in track selection process") - LOG.warning( - f"{message.data['skill_id']} is not answering fast " - "enough!") + LOG.warning(f"{skill_id} is not answering fast enough!") + return # populate search playlist - results = message.data.get("results", []) - for idx, res in enumerate(results): - if self.media_type not in [MediaType.ADULT, MediaType.HENTAI]: - # skip adult content results unless explicitly enabled - if not self.config.get("adult_content", False) and \ - res.get("media_type", MediaType.GENERIC) in \ - [MediaType.ADULT, MediaType.HENTAI]: - continue - - # filter uris we can play, usually files and http streams, but some - # skills might return results that depend on additional packages, - # eg. soundcloud, rss, youtube, deezer.... - uri = res.get("uri", "") - if res.get("playlist") and not uri: - res["playlist"] = [ - r for r in res["playlist"] - if r.get("uri") and any(r.get("uri").startswith(e) - for e in self.valid_uris)] - if not len(res["playlist"]): - results[idx] = None # can't play this search result! - LOG.error(f"Empty playlist for {res}") - continue - elif uri and res.get("playback") not in [ - PlaybackType.SKILL, PlaybackType.UNDEFINED] and \ - not any( - uri.startswith(e) for e in self.valid_uris): - results[idx] = None # can't play this search result! - LOG.error(f"stream handler not available for {res}") - continue - - # filter video results if GUI not connected - if not self.has_gui: - # force allowed stream types to be played audio only - if res.get("media_type", "") in self.cast2audio: - LOG.debug("unable to use GUI, " - "forcing result to play audio only") - res["playback"] = PlaybackType.AUDIO - res["match_confidence"] -= 10 - results[idx] = res - - # remove filtered results - message.data["results"] = [r for r in results if r is not None] - LOG.debug(f'got {len(message.data["results"])} results from {skill_id}') - self.query_replies.append(message.data) - - # abort searching if we gathered enough results - # TODO ensure we have a decent confidence match, if all matches - # are < 50% conf extend timeout instead - if time.time() - self.search_start > self.query_timeouts: - if self.searching: - self.searching = False - LOG.debug("common play query timeout, parsing results") - - elif self.searching: - for res in message.data.get("results", []): - if res.get("match_confidence", 0) >= \ - self.config.get("early_stop_thresh", 85): - # got a really good match, dont search further - LOG.info( - "Receiving very high confidence match, stopping " - "search early") - - # allow other skills to "just miss" - early_stop_grace = \ - self.config.get("early_stop_grace_period", 0.5) - if early_stop_grace: - LOG.debug( - f" - grace period: {early_stop_grace} seconds") - time.sleep(early_stop_grace) + res = message.data.get("results", []) + LOG.debug(f'got {len(res)} results from {skill_id}') + if res: + self.query_replies.append(message.data) + + # abort searching if we gathered enough results + # TODO ensure we have a decent confidence match, if all matches + # are < 50% conf extend timeout instead + if time.time() - self.search_start > self.query_timeouts: + if self.searching: self.searching = False - return + LOG.debug("common play query timeout, parsing results") + + elif self.searching: + for res in message.data.get("results", []): + if res.get("match_confidence", 0) >= \ + self.config.get("early_stop_thresh", 85): + # got a really good match, dont search further + LOG.info( + "Receiving very high confidence match, stopping " + "search early") + + # allow other skills to "just miss" + early_stop_grace = \ + self.config.get("early_stop_grace_period", 0.5) + if early_stop_grace: + LOG.debug( + f" - grace period: {early_stop_grace} seconds") + time.sleep(early_stop_grace) + self.searching = False + return def handle_skill_search_end(self, message): skill_id = message.data["skill_id"] diff --git a/requirements.txt b/requirements.txt index 6a6feee..c78a134 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ ovos-config >= 0.0.12, < 0.2.0 -ovos-utils >= 0.1.0a7, < 0.2.0 +ovos-utils >= 0.1.0a9, < 0.2.0 websocket-client>=0.54.0 pyee>=8.1.0, < 9.0.0