From dfed6652df2017fbfdf653566c41019f8134353a Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 May 2024 15:56:59 -0700 Subject: [PATCH 1/4] Update imports to support ovos-utils~=0.0.x with compat. warnings --- ovos_bus_client/apis/ocp.py | 428 +++++++++++++++++++++++++++++++++++- requirements.txt | 2 +- 2 files changed, 428 insertions(+), 2 deletions(-) diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index 296498c..96522a2 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -18,12 +18,438 @@ from ovos_utils.gui import is_gui_connected, is_gui_running from ovos_utils.log import LOG, deprecated -from ovos_utils.ocp import MediaType, PlaybackMode, Playlist, MediaEntry from ovos_bus_client.message import Message from ovos_bus_client.message import dig_for_message from ovos_bus_client.util import get_mycroft_bus +try: + from ovos_utils.ocp import MediaType, PlaybackMode, Playlist, MediaEntry +except ImportError: + import inspect + from enum import IntEnum + from typing import Optional, Union, Tuple + from dataclasses import dataclass + LOG.warning("ovos-utils~=0.1 not installed. Patching missing imports") + + import mimetypes + import orjson + + OCP_ID = "ovos.common_play" + + + def find_mime(uri): + """ Determine mime type. """ + mime = mimetypes.guess_type(uri) + if mime: + return mime + else: + return None + + + class MediaType(IntEnum): + GENERIC = 0 # nothing else matches + AUDIO = 1 # things like ambient noises + MUSIC = 2 + VIDEO = 3 # eg, youtube videos + AUDIOBOOK = 4 + GAME = 5 # because it shares the verb "play", mostly for disambguation + PODCAST = 6 + RADIO = 7 # live radio + NEWS = 8 # news reports + TV = 9 # live tv stream + MOVIE = 10 + TRAILER = 11 + AUDIO_DESCRIPTION = 12 # narrated movie for the blind + VISUAL_STORY = 13 # things like animated comic books + BEHIND_THE_SCENES = 14 + DOCUMENTARY = 15 + RADIO_THEATRE = 16 + SHORT_FILM = 17 # typically movies under 45 min + SILENT_MOVIE = 18 + VIDEO_EPISODES = 19 # tv series etc + BLACK_WHITE_MOVIE = 20 + CARTOON = 21 + ANIME = 22 + ASMR = 23 + + ADULT = 69 # for content filtering + HENTAI = 70 # for content filtering + ADULT_AUDIO = 71 # for content filtering + + + class PlaybackMode(IntEnum): + AUTO = 0 # play each entry as considered appropriate, + # ie, make it happen the best way possible + AUDIO_ONLY = 10 # only consider audio entries + VIDEO_ONLY = 20 # only consider video entries + FORCE_AUDIO = 30 # cast video to audio unconditionally + FORCE_AUDIOSERVICE = 40 ## DEPRECATED - used in ovos 0.0.7 + EVENTS_ONLY = 50 # only emit ocp events, do not display or play anything. + # allows integration with external interfaces + + + class PlaybackType(IntEnum): + SKILL = 0 # skills handle playback whatever way they see fit, + # eg spotify / mycroft common play + VIDEO = 1 # Video results + AUDIO = 2 # Results should be played audio only + AUDIO_SERVICE = 3 ## DEPRECATED - used in ovos 0.0.7 + MPRIS = 4 # External MPRIS compliant player + WEBVIEW = 5 # webview, render a url instead of media player + UNDEFINED = 100 # data not available, hopefully status will be updated soon.. + + + class TrackState(IntEnum): + DISAMBIGUATION = 1 # media result, not queued for playback + PLAYING_SKILL = 20 # Skill is handling playback internally + PLAYING_AUDIOSERVICE = 21 ## DEPRECATED - used in ovos 0.0.7 + PLAYING_VIDEO = 22 # Skill forwarded playback to video service + PLAYING_AUDIO = 23 # Skill forwarded playback to audio service + PLAYING_MPRIS = 24 # External media player is handling playback + PLAYING_WEBVIEW = 25 # Media playback handled in browser (eg. javascript) + + QUEUED_SKILL = 30 # Waiting playback to be handled inside skill + QUEUED_AUDIOSERVICE = 31 ## DEPRECATED - used in ovos 0.0.7 + QUEUED_VIDEO = 32 # Waiting playback in video service + QUEUED_AUDIO = 33 # Waiting playback in audio service + QUEUED_WEBVIEW = 34 # Waiting playback in browser service + + + @dataclass + class Playlist(list): + title: str = "" + position: int = 0 + length: int = 0 # in seconds + image: str = "" + match_confidence: int = 0 # 0 - 100 + skill_id: str = OCP_ID + skill_icon: str = "" + + def __init__(self, *args, **kwargs): + super().__init__(**kwargs) + list.__init__(self, *args) + + @property + def infocard(self) -> dict: + """ + Return dict data used for a UI display + (model shared with MediaEntry) + """ + return { + "duration": self.length, + "track": self.title, + "image": self.image, + "album": self.skill_id, + "source": self.skill_icon, + "uri": "" + } + + @staticmethod + def from_dict(track: dict): + return MediaEntry.from_dict(track) + + @property + def as_dict(self) -> dict: + """ + Return a dict representation of this MediaEntry + """ + data = { + "title": self.title, + "position": self.position, + "length": self.length, + "image": self.image, + "match_confidence": self.match_confidence, + "skill_id": self.skill_id, + "skill_icon": self.skill_icon, + "playlist": [e.as_dict for e in self.entries] + } + return data + + @property + def entries(self) -> List[MediaEntry]: + """ + Return a list of MediaEntry objects in the playlist + """ + entries = [] + for e in self: + if isinstance(e, dict): + e = MediaEntry.from_dict(e) + if isinstance(e, MediaEntry): + entries.append(e) + return entries + + @property + def current_track(self) -> Optional[MediaEntry]: + """ + Return the current MediaEntry or None if the playlist is empty + """ + if len(self) == 0: + return None + self._validate_position() + track = self[self.position] + if isinstance(track, dict): + track = MediaEntry.from_dict(track) + return track + + @property + def is_first_track(self) -> bool: + """ + Return `True` if the current position is the first track or if the + playlist is empty + """ + if len(self) == 0: + return True + return self.position == 0 + + @property + def is_last_track(self) -> bool: + """ + Return `True` if the current position is the last track of if the + playlist is empty + """ + if len(self) == 0: + return True + return self.position == len(self) - 1 + + def goto_start(self) -> None: + """ + Move to the first entry in the playlist + """ + self.position = 0 + + def clear(self) -> None: + """ + Remove all entries from the Playlist and reset the position + """ + super().clear() + self.position = 0 + + def sort_by_conf(self): + """ + Sort the Playlist by `match_confidence` with high confidence first + """ + self.sort( + key=lambda k: k.match_confidence if isinstance(k, (MediaEntry, Playlist)) + else k.get("match_confidence", 0), reverse=True) + + def add_entry(self, entry: MediaEntry, index: int = -1) -> None: + """ + Add an entry at the requested index + @param entry: MediaEntry to add to playlist + @param index: index to insert entry at (default -1 to append) + """ + assert isinstance(index, int) + if index > len(self): + raise ValueError(f"Invalid index {index} requested, " + f"playlist only has {len(self)} entries") + + if isinstance(entry, dict): + entry = MediaEntry.from_dict(entry) + + assert isinstance(entry, (MediaEntry, Playlist)) + + if index == -1: + index = len(self) + + if index < self.position: + self.set_position(self.position + 1) + + self.insert(index, entry) + + def remove_entry(self, entry: Union[int, dict, MediaEntry]) -> None: + """ + Remove the requested entry from the playlist or raise a ValueError + @param entry: index or MediaEntry to remove from the playlist + """ + if isinstance(entry, int): + self.pop(entry) + return + if isinstance(entry, dict): + entry = MediaEntry.from_dict(entry) + assert isinstance(entry, MediaEntry) + for idx, e in enumerate(self.entries): + if e == entry: + self.pop(idx) + break + else: + raise ValueError(f"entry not in playlist: {entry}") + + def replace(self, new_list: List[Union[dict, MediaEntry]]) -> None: + """ + Replace the contents of this Playlist with new_list + @param new_list: list of MediaEntry or dict objects to set this list to + """ + self.clear() + for e in new_list: + self.add_entry(e) + + def set_position(self, idx: int): + """ + Set the position in the playlist to a specific index + @param idx: Index to set position to + """ + self.position = idx + self._validate_position() + + def goto_track(self, track: Union[MediaEntry, dict]) -> None: + """ + Go to the requested track in the playlist + @param track: MediaEntry to find and go to in the playlist + """ + if isinstance(track, dict): + track = MediaEntry.from_dict(track) + + assert isinstance(track, (MediaEntry, Playlist)) + + if isinstance(track, MediaEntry): + requested_uri = track.uri + else: + requested_uri = track.title + + for idx, t in enumerate(self): + if isinstance(t, MediaEntry): + pl_entry_uri = t.uri + else: + pl_entry_uri = t.title + + if requested_uri == pl_entry_uri: + self.set_position(idx) + LOG.debug(f"New playlist position: {self.position}") + return + LOG.error(f"requested track not in the playlist: {track}") + + def next_track(self) -> None: + """ + Go to the next track in the playlist + """ + self.set_position(self.position + 1) + + def prev_track(self) -> None: + """ + Go to the previous track in the playlist + """ + self.set_position(self.position - 1) + + def _validate_position(self) -> None: + """ + Make sure the current position is valid; default `position` to 0 + """ + if self.position < 0 or self.position >= len(self): + LOG.error(f"Playlist pointer is in an invalid position " + f"({self.position}! Going to start of playlist") + self.position = 0 + + def __contains__(self, item): + if isinstance(item, dict): + item = MediaEntry.from_dict(item) + if isinstance(item, MediaEntry): + for e in self.entries: + if e.uri == item.uri: + return True + return False + + + @dataclass + class MediaEntry: + uri: str = "" + title: str = "" + artist: str = "" + match_confidence: int = 0 # 0 - 100 + skill_id: str = OCP_ID + playback: PlaybackType = PlaybackType.UNDEFINED + status: TrackState = TrackState.DISAMBIGUATION + media_type: MediaType = MediaType.GENERIC + length: int = 0 # in seconds + image: str = "" + skill_icon: str = "" + javascript: str = "" # to execute once webview is loaded + + def update(self, entry: dict, skipkeys: list = None, newonly: bool = False): + """ + Update this MediaEntry object with keys from the provided entry + @param entry: dict or MediaEntry object to update this object with + @param skipkeys: list of keys to not change + @param newonly: if True, only adds new keys; existing keys are unchanged + """ + skipkeys = skipkeys or [] + if isinstance(entry, MediaEntry): + entry = entry.as_dict + entry = entry or {} + for k, v in entry.items(): + if k not in skipkeys and hasattr(self, k): + if newonly and self.__getattribute__(k): + # skip, do not replace existing values + continue + self.__setattr__(k, v) + + @property + def infocard(self) -> dict: + """ + Return dict data used for a UI display + """ + return { + "duration": self.length, + "track": self.title, + "image": self.image, + "album": self.skill_id, + "source": self.skill_icon, + "uri": self.uri + } + + @property + def mpris_metadata(self) -> dict: + """ + Return dict data used by MPRIS + """ + from dbus_next.service import Variant + meta = {"xesam:url": Variant('s', self.uri)} + if self.artist: + meta['xesam:artist'] = Variant('as', [self.artist]) + if self.title: + meta['xesam:title'] = Variant('s', self.title) + if self.image: + meta['mpris:artUrl'] = Variant('s', self.image) + if self.length: + meta['mpris:length'] = Variant('d', self.length) + return meta + + @property + def as_dict(self) -> dict: + """ + Return a dict representation of this MediaEntry + """ + # orjson handles dataclasses directly + return orjson.loads(orjson.dumps(self).decode("utf-8")) + + @staticmethod + def from_dict(track: dict): + if track.get("playlist"): + kwargs = {k: v for k, v in track.items() + if k in inspect.signature(Playlist).parameters} + playlist = Playlist(**kwargs) + for e in track["playlist"]: + playlist.add_entry(e) + return playlist + else: + kwargs = {k: v for k, v in track.items() + if k in inspect.signature(MediaEntry).parameters} + return MediaEntry(**kwargs) + + @property + def mimetype(self) -> Optional[Tuple[Optional[str], Optional[str]]]: + """ + Get the detected mimetype tuple (type, encoding) if it can be determined + """ + if self.uri: + return find_mime(self.uri) + + def __eq__(self, other): + if isinstance(other, MediaEntry): + other = other.infocard + # dict comparison + return other == self.infocard + def ensure_uri(s: str): """ diff --git a/requirements.txt b/requirements.txt index fce3fce..3b1195e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ovos-config >= 0.0.12, < 0.2.0 -ovos-utils >= 0.1.0a9, < 0.2.0 +ovos-utils >= 0.0.38, < 0.2.0 websocket-client>=0.54.0 pyee>=8.1.0, < 9.0.0 orjson \ No newline at end of file From 22fd2c4a90b469e4d4f70aaa33f95500a09cde1b Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 May 2024 16:03:58 -0700 Subject: [PATCH 2/4] Refactor to resolve reference error https://github.com/NeonGeckoCom/NeonCore/actions/runs/8976886241/job/24654634811?pr=641 --- ovos_bus_client/apis/ocp.py | 202 ++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 101 deletions(-) diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index 96522a2..60a8a45 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -116,6 +116,107 @@ class TrackState(IntEnum): QUEUED_WEBVIEW = 34 # Waiting playback in browser service + @dataclass + class MediaEntry: + uri: str = "" + title: str = "" + artist: str = "" + match_confidence: int = 0 # 0 - 100 + skill_id: str = OCP_ID + playback: PlaybackType = PlaybackType.UNDEFINED + status: TrackState = TrackState.DISAMBIGUATION + media_type: MediaType = MediaType.GENERIC + length: int = 0 # in seconds + image: str = "" + skill_icon: str = "" + javascript: str = "" # to execute once webview is loaded + + def update(self, entry: dict, skipkeys: list = None, newonly: bool = False): + """ + Update this MediaEntry object with keys from the provided entry + @param entry: dict or MediaEntry object to update this object with + @param skipkeys: list of keys to not change + @param newonly: if True, only adds new keys; existing keys are unchanged + """ + skipkeys = skipkeys or [] + if isinstance(entry, MediaEntry): + entry = entry.as_dict + entry = entry or {} + for k, v in entry.items(): + if k not in skipkeys and hasattr(self, k): + if newonly and self.__getattribute__(k): + # skip, do not replace existing values + continue + self.__setattr__(k, v) + + @property + def infocard(self) -> dict: + """ + Return dict data used for a UI display + """ + return { + "duration": self.length, + "track": self.title, + "image": self.image, + "album": self.skill_id, + "source": self.skill_icon, + "uri": self.uri + } + + @property + def mpris_metadata(self) -> dict: + """ + Return dict data used by MPRIS + """ + from dbus_next.service import Variant + meta = {"xesam:url": Variant('s', self.uri)} + if self.artist: + meta['xesam:artist'] = Variant('as', [self.artist]) + if self.title: + meta['xesam:title'] = Variant('s', self.title) + if self.image: + meta['mpris:artUrl'] = Variant('s', self.image) + if self.length: + meta['mpris:length'] = Variant('d', self.length) + return meta + + @property + def as_dict(self) -> dict: + """ + Return a dict representation of this MediaEntry + """ + # orjson handles dataclasses directly + return orjson.loads(orjson.dumps(self).decode("utf-8")) + + @staticmethod + def from_dict(track: dict): + if track.get("playlist"): + kwargs = {k: v for k, v in track.items() + if k in inspect.signature(Playlist).parameters} + playlist = Playlist(**kwargs) + for e in track["playlist"]: + playlist.add_entry(e) + return playlist + else: + kwargs = {k: v for k, v in track.items() + if k in inspect.signature(MediaEntry).parameters} + return MediaEntry(**kwargs) + + @property + def mimetype(self) -> Optional[Tuple[Optional[str], Optional[str]]]: + """ + Get the detected mimetype tuple (type, encoding) if it can be determined + """ + if self.uri: + return find_mime(self.uri) + + def __eq__(self, other): + if isinstance(other, MediaEntry): + other = other.infocard + # dict comparison + return other == self.infocard + + @dataclass class Playlist(list): title: str = "" @@ -350,107 +451,6 @@ def __contains__(self, item): return False - @dataclass - class MediaEntry: - uri: str = "" - title: str = "" - artist: str = "" - match_confidence: int = 0 # 0 - 100 - skill_id: str = OCP_ID - playback: PlaybackType = PlaybackType.UNDEFINED - status: TrackState = TrackState.DISAMBIGUATION - media_type: MediaType = MediaType.GENERIC - length: int = 0 # in seconds - image: str = "" - skill_icon: str = "" - javascript: str = "" # to execute once webview is loaded - - def update(self, entry: dict, skipkeys: list = None, newonly: bool = False): - """ - Update this MediaEntry object with keys from the provided entry - @param entry: dict or MediaEntry object to update this object with - @param skipkeys: list of keys to not change - @param newonly: if True, only adds new keys; existing keys are unchanged - """ - skipkeys = skipkeys or [] - if isinstance(entry, MediaEntry): - entry = entry.as_dict - entry = entry or {} - for k, v in entry.items(): - if k not in skipkeys and hasattr(self, k): - if newonly and self.__getattribute__(k): - # skip, do not replace existing values - continue - self.__setattr__(k, v) - - @property - def infocard(self) -> dict: - """ - Return dict data used for a UI display - """ - return { - "duration": self.length, - "track": self.title, - "image": self.image, - "album": self.skill_id, - "source": self.skill_icon, - "uri": self.uri - } - - @property - def mpris_metadata(self) -> dict: - """ - Return dict data used by MPRIS - """ - from dbus_next.service import Variant - meta = {"xesam:url": Variant('s', self.uri)} - if self.artist: - meta['xesam:artist'] = Variant('as', [self.artist]) - if self.title: - meta['xesam:title'] = Variant('s', self.title) - if self.image: - meta['mpris:artUrl'] = Variant('s', self.image) - if self.length: - meta['mpris:length'] = Variant('d', self.length) - return meta - - @property - def as_dict(self) -> dict: - """ - Return a dict representation of this MediaEntry - """ - # orjson handles dataclasses directly - return orjson.loads(orjson.dumps(self).decode("utf-8")) - - @staticmethod - def from_dict(track: dict): - if track.get("playlist"): - kwargs = {k: v for k, v in track.items() - if k in inspect.signature(Playlist).parameters} - playlist = Playlist(**kwargs) - for e in track["playlist"]: - playlist.add_entry(e) - return playlist - else: - kwargs = {k: v for k, v in track.items() - if k in inspect.signature(MediaEntry).parameters} - return MediaEntry(**kwargs) - - @property - def mimetype(self) -> Optional[Tuple[Optional[str], Optional[str]]]: - """ - Get the detected mimetype tuple (type, encoding) if it can be determined - """ - if self.uri: - return find_mime(self.uri) - - def __eq__(self, other): - if isinstance(other, MediaEntry): - other = other.infocard - # dict comparison - return other == self.infocard - - def ensure_uri(s: str): """ Interpret paths as file:// uri's. From a234efeb7fa2d789b76c7e8dbcfdd7c31e7118df Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 May 2024 17:08:11 -0700 Subject: [PATCH 3/4] Add type annotations to resolve warnings Remove patched OCP imports and move imports into classes/methods as requested --- ovos_bus_client/apis/ocp.py | 469 +++--------------------------------- 1 file changed, 30 insertions(+), 439 deletions(-) diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index 60a8a45..2bf2efe 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -14,7 +14,7 @@ from datetime import timedelta from os.path import abspath from threading import Lock -from typing import List +from typing import List, Union from ovos_utils.gui import is_gui_connected, is_gui_running from ovos_utils.log import LOG, deprecated @@ -23,433 +23,6 @@ from ovos_bus_client.message import dig_for_message from ovos_bus_client.util import get_mycroft_bus -try: - from ovos_utils.ocp import MediaType, PlaybackMode, Playlist, MediaEntry -except ImportError: - import inspect - from enum import IntEnum - from typing import Optional, Union, Tuple - from dataclasses import dataclass - LOG.warning("ovos-utils~=0.1 not installed. Patching missing imports") - - import mimetypes - import orjson - - OCP_ID = "ovos.common_play" - - - def find_mime(uri): - """ Determine mime type. """ - mime = mimetypes.guess_type(uri) - if mime: - return mime - else: - return None - - - class MediaType(IntEnum): - GENERIC = 0 # nothing else matches - AUDIO = 1 # things like ambient noises - MUSIC = 2 - VIDEO = 3 # eg, youtube videos - AUDIOBOOK = 4 - GAME = 5 # because it shares the verb "play", mostly for disambguation - PODCAST = 6 - RADIO = 7 # live radio - NEWS = 8 # news reports - TV = 9 # live tv stream - MOVIE = 10 - TRAILER = 11 - AUDIO_DESCRIPTION = 12 # narrated movie for the blind - VISUAL_STORY = 13 # things like animated comic books - BEHIND_THE_SCENES = 14 - DOCUMENTARY = 15 - RADIO_THEATRE = 16 - SHORT_FILM = 17 # typically movies under 45 min - SILENT_MOVIE = 18 - VIDEO_EPISODES = 19 # tv series etc - BLACK_WHITE_MOVIE = 20 - CARTOON = 21 - ANIME = 22 - ASMR = 23 - - ADULT = 69 # for content filtering - HENTAI = 70 # for content filtering - ADULT_AUDIO = 71 # for content filtering - - - class PlaybackMode(IntEnum): - AUTO = 0 # play each entry as considered appropriate, - # ie, make it happen the best way possible - AUDIO_ONLY = 10 # only consider audio entries - VIDEO_ONLY = 20 # only consider video entries - FORCE_AUDIO = 30 # cast video to audio unconditionally - FORCE_AUDIOSERVICE = 40 ## DEPRECATED - used in ovos 0.0.7 - EVENTS_ONLY = 50 # only emit ocp events, do not display or play anything. - # allows integration with external interfaces - - - class PlaybackType(IntEnum): - SKILL = 0 # skills handle playback whatever way they see fit, - # eg spotify / mycroft common play - VIDEO = 1 # Video results - AUDIO = 2 # Results should be played audio only - AUDIO_SERVICE = 3 ## DEPRECATED - used in ovos 0.0.7 - MPRIS = 4 # External MPRIS compliant player - WEBVIEW = 5 # webview, render a url instead of media player - UNDEFINED = 100 # data not available, hopefully status will be updated soon.. - - - class TrackState(IntEnum): - DISAMBIGUATION = 1 # media result, not queued for playback - PLAYING_SKILL = 20 # Skill is handling playback internally - PLAYING_AUDIOSERVICE = 21 ## DEPRECATED - used in ovos 0.0.7 - PLAYING_VIDEO = 22 # Skill forwarded playback to video service - PLAYING_AUDIO = 23 # Skill forwarded playback to audio service - PLAYING_MPRIS = 24 # External media player is handling playback - PLAYING_WEBVIEW = 25 # Media playback handled in browser (eg. javascript) - - QUEUED_SKILL = 30 # Waiting playback to be handled inside skill - QUEUED_AUDIOSERVICE = 31 ## DEPRECATED - used in ovos 0.0.7 - QUEUED_VIDEO = 32 # Waiting playback in video service - QUEUED_AUDIO = 33 # Waiting playback in audio service - QUEUED_WEBVIEW = 34 # Waiting playback in browser service - - - @dataclass - class MediaEntry: - uri: str = "" - title: str = "" - artist: str = "" - match_confidence: int = 0 # 0 - 100 - skill_id: str = OCP_ID - playback: PlaybackType = PlaybackType.UNDEFINED - status: TrackState = TrackState.DISAMBIGUATION - media_type: MediaType = MediaType.GENERIC - length: int = 0 # in seconds - image: str = "" - skill_icon: str = "" - javascript: str = "" # to execute once webview is loaded - - def update(self, entry: dict, skipkeys: list = None, newonly: bool = False): - """ - Update this MediaEntry object with keys from the provided entry - @param entry: dict or MediaEntry object to update this object with - @param skipkeys: list of keys to not change - @param newonly: if True, only adds new keys; existing keys are unchanged - """ - skipkeys = skipkeys or [] - if isinstance(entry, MediaEntry): - entry = entry.as_dict - entry = entry or {} - for k, v in entry.items(): - if k not in skipkeys and hasattr(self, k): - if newonly and self.__getattribute__(k): - # skip, do not replace existing values - continue - self.__setattr__(k, v) - - @property - def infocard(self) -> dict: - """ - Return dict data used for a UI display - """ - return { - "duration": self.length, - "track": self.title, - "image": self.image, - "album": self.skill_id, - "source": self.skill_icon, - "uri": self.uri - } - - @property - def mpris_metadata(self) -> dict: - """ - Return dict data used by MPRIS - """ - from dbus_next.service import Variant - meta = {"xesam:url": Variant('s', self.uri)} - if self.artist: - meta['xesam:artist'] = Variant('as', [self.artist]) - if self.title: - meta['xesam:title'] = Variant('s', self.title) - if self.image: - meta['mpris:artUrl'] = Variant('s', self.image) - if self.length: - meta['mpris:length'] = Variant('d', self.length) - return meta - - @property - def as_dict(self) -> dict: - """ - Return a dict representation of this MediaEntry - """ - # orjson handles dataclasses directly - return orjson.loads(orjson.dumps(self).decode("utf-8")) - - @staticmethod - def from_dict(track: dict): - if track.get("playlist"): - kwargs = {k: v for k, v in track.items() - if k in inspect.signature(Playlist).parameters} - playlist = Playlist(**kwargs) - for e in track["playlist"]: - playlist.add_entry(e) - return playlist - else: - kwargs = {k: v for k, v in track.items() - if k in inspect.signature(MediaEntry).parameters} - return MediaEntry(**kwargs) - - @property - def mimetype(self) -> Optional[Tuple[Optional[str], Optional[str]]]: - """ - Get the detected mimetype tuple (type, encoding) if it can be determined - """ - if self.uri: - return find_mime(self.uri) - - def __eq__(self, other): - if isinstance(other, MediaEntry): - other = other.infocard - # dict comparison - return other == self.infocard - - - @dataclass - class Playlist(list): - title: str = "" - position: int = 0 - length: int = 0 # in seconds - image: str = "" - match_confidence: int = 0 # 0 - 100 - skill_id: str = OCP_ID - skill_icon: str = "" - - def __init__(self, *args, **kwargs): - super().__init__(**kwargs) - list.__init__(self, *args) - - @property - def infocard(self) -> dict: - """ - Return dict data used for a UI display - (model shared with MediaEntry) - """ - return { - "duration": self.length, - "track": self.title, - "image": self.image, - "album": self.skill_id, - "source": self.skill_icon, - "uri": "" - } - - @staticmethod - def from_dict(track: dict): - return MediaEntry.from_dict(track) - - @property - def as_dict(self) -> dict: - """ - Return a dict representation of this MediaEntry - """ - data = { - "title": self.title, - "position": self.position, - "length": self.length, - "image": self.image, - "match_confidence": self.match_confidence, - "skill_id": self.skill_id, - "skill_icon": self.skill_icon, - "playlist": [e.as_dict for e in self.entries] - } - return data - - @property - def entries(self) -> List[MediaEntry]: - """ - Return a list of MediaEntry objects in the playlist - """ - entries = [] - for e in self: - if isinstance(e, dict): - e = MediaEntry.from_dict(e) - if isinstance(e, MediaEntry): - entries.append(e) - return entries - - @property - def current_track(self) -> Optional[MediaEntry]: - """ - Return the current MediaEntry or None if the playlist is empty - """ - if len(self) == 0: - return None - self._validate_position() - track = self[self.position] - if isinstance(track, dict): - track = MediaEntry.from_dict(track) - return track - - @property - def is_first_track(self) -> bool: - """ - Return `True` if the current position is the first track or if the - playlist is empty - """ - if len(self) == 0: - return True - return self.position == 0 - - @property - def is_last_track(self) -> bool: - """ - Return `True` if the current position is the last track of if the - playlist is empty - """ - if len(self) == 0: - return True - return self.position == len(self) - 1 - - def goto_start(self) -> None: - """ - Move to the first entry in the playlist - """ - self.position = 0 - - def clear(self) -> None: - """ - Remove all entries from the Playlist and reset the position - """ - super().clear() - self.position = 0 - - def sort_by_conf(self): - """ - Sort the Playlist by `match_confidence` with high confidence first - """ - self.sort( - key=lambda k: k.match_confidence if isinstance(k, (MediaEntry, Playlist)) - else k.get("match_confidence", 0), reverse=True) - - def add_entry(self, entry: MediaEntry, index: int = -1) -> None: - """ - Add an entry at the requested index - @param entry: MediaEntry to add to playlist - @param index: index to insert entry at (default -1 to append) - """ - assert isinstance(index, int) - if index > len(self): - raise ValueError(f"Invalid index {index} requested, " - f"playlist only has {len(self)} entries") - - if isinstance(entry, dict): - entry = MediaEntry.from_dict(entry) - - assert isinstance(entry, (MediaEntry, Playlist)) - - if index == -1: - index = len(self) - - if index < self.position: - self.set_position(self.position + 1) - - self.insert(index, entry) - - def remove_entry(self, entry: Union[int, dict, MediaEntry]) -> None: - """ - Remove the requested entry from the playlist or raise a ValueError - @param entry: index or MediaEntry to remove from the playlist - """ - if isinstance(entry, int): - self.pop(entry) - return - if isinstance(entry, dict): - entry = MediaEntry.from_dict(entry) - assert isinstance(entry, MediaEntry) - for idx, e in enumerate(self.entries): - if e == entry: - self.pop(idx) - break - else: - raise ValueError(f"entry not in playlist: {entry}") - - def replace(self, new_list: List[Union[dict, MediaEntry]]) -> None: - """ - Replace the contents of this Playlist with new_list - @param new_list: list of MediaEntry or dict objects to set this list to - """ - self.clear() - for e in new_list: - self.add_entry(e) - - def set_position(self, idx: int): - """ - Set the position in the playlist to a specific index - @param idx: Index to set position to - """ - self.position = idx - self._validate_position() - - def goto_track(self, track: Union[MediaEntry, dict]) -> None: - """ - Go to the requested track in the playlist - @param track: MediaEntry to find and go to in the playlist - """ - if isinstance(track, dict): - track = MediaEntry.from_dict(track) - - assert isinstance(track, (MediaEntry, Playlist)) - - if isinstance(track, MediaEntry): - requested_uri = track.uri - else: - requested_uri = track.title - - for idx, t in enumerate(self): - if isinstance(t, MediaEntry): - pl_entry_uri = t.uri - else: - pl_entry_uri = t.title - - if requested_uri == pl_entry_uri: - self.set_position(idx) - LOG.debug(f"New playlist position: {self.position}") - return - LOG.error(f"requested track not in the playlist: {track}") - - def next_track(self) -> None: - """ - Go to the next track in the playlist - """ - self.set_position(self.position + 1) - - def prev_track(self) -> None: - """ - Go to the previous track in the playlist - """ - self.set_position(self.position - 1) - - def _validate_position(self) -> None: - """ - Make sure the current position is valid; default `position` to 0 - """ - if self.position < 0 or self.position >= len(self): - LOG.error(f"Playlist pointer is in an invalid position " - f"({self.position}! Going to start of playlist") - self.position = 0 - - def __contains__(self, item): - if isinstance(item, dict): - item = MediaEntry.from_dict(item) - if isinstance(item, MediaEntry): - for e in self.entries: - if e.uri == item.uri: - return True - return False - def ensure_uri(s: str): """ @@ -589,7 +162,7 @@ def set_track_position(self, seconds): self.bus.emit(Message('mycroft.audio.service.set_track_position', {"position": seconds * 1000})) # convert to ms - def seek(self, seconds=1): + def seek(self, seconds: Union[int, float, timedelta] = 1): """Seek X seconds. Args: @@ -602,7 +175,7 @@ def seek(self, seconds=1): else: self.seek_forward(seconds) - def seek_forward(self, seconds=1): + def seek_forward(self, seconds: Union[int, float, timedelta] = 1): """Skip ahead X seconds. Args: @@ -613,7 +186,7 @@ def seek_forward(self, seconds=1): self.bus.emit(Message('mycroft.audio.service.seek_forward', {"seconds": seconds})) - def seek_backward(self, seconds=1): + def seek_backward(self, seconds: Union[int, float, timedelta] = 1): """Rewind X seconds Args: @@ -679,6 +252,11 @@ def _format_msg(self, msg_type, msg_data=None): # OCP bus api @staticmethod def norm_tracks(tracks: list): + try: + from ovos_utils.ocp import Playlist, MediaEntry + except ImportError as e: + raise RuntimeError("This method requires ovos-utils ~=0.1") from e + """ensures a list of tracks contains only MediaEntry or Playlist items""" assert isinstance(tracks, list) # support Playlist and MediaEntry objects in tracks @@ -905,7 +483,7 @@ def set_track_position(self, seconds): self.bus.emit(Message('ovos.audio.service.set_track_position', {"position": seconds * 1000})) # convert to ms - def seek(self, seconds=1): + def seek(self, seconds: Union[int, float, timedelta] = 1): """Seek X seconds. Args: @@ -918,7 +496,7 @@ def seek(self, seconds=1): else: self.seek_forward(seconds) - def seek_forward(self, seconds=1): + def seek_forward(self, seconds: Union[int, float, timedelta] = 1): """Skip ahead X seconds. Args: @@ -929,7 +507,7 @@ def seek_forward(self, seconds=1): self.bus.emit(Message('ovos.audio.service.seek_forward', {"seconds": seconds})) - def seek_backward(self, seconds=1): + def seek_backward(self, seconds: Union[int, float, timedelta] = 1): """Rewind X seconds Args: @@ -1067,7 +645,7 @@ def seek(self, seconds=1): else: self.seek_forward(seconds) - def seek_forward(self, seconds=1): + def seek_forward(self, seconds: Union[int, float, timedelta] = 1): """Skip ahead X seconds. Args: @@ -1078,7 +656,7 @@ def seek_forward(self, seconds=1): self.bus.emit(Message('ovos.video.service.seek_forward', {"seconds": seconds})) - def seek_backward(self, seconds=1): + def seek_backward(self, seconds: Union[int, float, timedelta] = 1): """Rewind X seconds Args: @@ -1203,7 +781,7 @@ def set_track_position(self, seconds): self.bus.emit(Message('ovos.web.service.set_track_position', {"position": seconds * 1000})) # convert to ms - def seek(self, seconds=1): + def seek(self, seconds: Union[int, float, timedelta] = 1): """Seek X seconds. Args: @@ -1216,7 +794,7 @@ def seek(self, seconds=1): else: self.seek_forward(seconds) - def seek_forward(self, seconds=1): + def seek_forward(self, seconds: Union[int, float, timedelta] = 1): """Skip ahead X seconds. Args: @@ -1227,7 +805,7 @@ def seek_forward(self, seconds=1): self.bus.emit(Message('ovos.web.service.seek_forward', {"seconds": seconds})) - def seek_backward(self, seconds=1): + def seek_backward(self, seconds: Union[int, float, timedelta] = 1): """Rewind X seconds Args: @@ -1267,6 +845,11 @@ def is_playing(self): class OCPQuery: + try: + from ovos_utils.ocp import MediaType + except ImportError as e: + raise RuntimeError("This class requires ovos-utils ~=0.1") from e + cast2audio = [ MediaType.MUSIC, MediaType.PODCAST, @@ -1286,6 +869,10 @@ def __init__(self, query, bus, media_type=MediaType.GENERIC, config=None): self.reset() def reset(self): + try: + from ovos_utils.ocp import PlaybackMode + except ImportError as e: + raise RuntimeError("This method requires ovos-utils ~=0.1") from e self.active_skills = {} self.active_skills_lock = Lock() self.query_replies = [] @@ -1313,6 +900,10 @@ def send(self, skill_id: str = None): "question_type": self.media_type})) def wait(self): + try: + from ovos_utils.ocp import MediaType + except ImportError as e: + raise RuntimeError("This method requires ovos-utils ~=0.1") from e # if there is no match type defined, lets increase timeout a bit # since all skills need to search if self.media_type == MediaType.GENERIC: From 490514e9463131012f87b1f883ffe834bacfaf4d Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 May 2024 17:17:05 -0700 Subject: [PATCH 4/4] Refactor to allow initialization without ovos-utils 0.1 --- ovos_bus_client/apis/ocp.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/ovos_bus_client/apis/ocp.py b/ovos_bus_client/apis/ocp.py index 2bf2efe..05a8e4f 100644 --- a/ovos_bus_client/apis/ocp.py +++ b/ovos_bus_client/apis/ocp.py @@ -847,20 +847,26 @@ def is_playing(self): class OCPQuery: try: from ovos_utils.ocp import MediaType + cast2audio = [ + MediaType.MUSIC, + MediaType.PODCAST, + MediaType.AUDIOBOOK, + MediaType.RADIO, + MediaType.RADIO_THEATRE, + MediaType.VISUAL_STORY, + MediaType.NEWS + ] except ImportError as e: - raise RuntimeError("This class requires ovos-utils ~=0.1") from e - - cast2audio = [ - MediaType.MUSIC, - MediaType.PODCAST, - MediaType.AUDIOBOOK, - MediaType.RADIO, - MediaType.RADIO_THEATRE, - MediaType.VISUAL_STORY, - MediaType.NEWS - ] + from enum import IntEnum + + class MediaType(IntEnum): + GENERIC = 0 # nothing else matches + + cast2audio = None def __init__(self, query, bus, media_type=MediaType.GENERIC, config=None): + if self.cast2audio is None: + raise RuntimeError("This class requires ovos-utils ~=0.1") LOG.debug(f"Created {media_type.name} query: {query}") self.query = query self.media_type = media_type