From 27b18d5ad5a8bb46d20b5b37f79a9dfafffc7362 Mon Sep 17 00:00:00 2001 From: miro Date: Tue, 12 Nov 2024 12:47:44 +0000 Subject: [PATCH] fix:pep8 --- __init__.py | 9 ++-- util/alert.py | 114 +++++++++++++++++++++----------------------- util/locale.py | 2 +- util/parse_utils.py | 2 +- util/ui_models.py | 14 +++--- 5 files changed, 67 insertions(+), 74 deletions(-) diff --git a/__init__.py b/__init__.py index fec995d..54a6101 100644 --- a/__init__.py +++ b/__init__.py @@ -949,7 +949,7 @@ def handle_todo_list_entries(self, message: Optional[Message] = None, alert: Opt "list_todo_subitems", {"name": alert.alert_name, "items": join_word_list([alert.alert_name for alert in list_entries], - connector="and", sep=",", lang=self.lang)}, + connector="and", sep=",", lang=self.lang)}, wait=True, ) else: @@ -1043,7 +1043,7 @@ def handle_delete_todo_entries(self, message: Message): self.speak_dialog( "list_todo_subitems", {"items": join_word_list([todo.alert_name for todo in todos], - connector="and", sep=",", lang=self.lang)}, + connector="and", sep=",", lang=self.lang)}, wait=True, ) time.sleep(2) @@ -1163,8 +1163,8 @@ def confirm_alert(self, alert: Alert, message: Message, repeat_interval = translate("weekday", lang=self.lang) else: repeat_interval = join_word_list([spoken_weekday(day, self.lang) - for day in alert.repeat_days], - connector="and", sep=",", lang=self.lang) + for day in alert.repeat_days], + connector="and", sep=",", lang=self.lang) # Notify repeating alert if alert.audio_file: @@ -2012,4 +2012,3 @@ def stop(self): LOG.debug(f"skill-stop called, all active alerts will be removed") for alert in self.alert_manager.get_active_alerts(): self._dismiss_alert(alert.ident, speak=True) - diff --git a/util/alert.py b/util/alert.py index 198f2ef..167edc0 100644 --- a/util/alert.py +++ b/util/alert.py @@ -26,32 +26,30 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from uuid import uuid4 import datetime as dt -from time import time import json +from time import time from typing import Set, Optional, Union, List +from uuid import uuid4 import icalendar from dateutil.relativedelta import relativedelta from json_database.utils import merge_dict -from ovos_utils.log import LOG -from ovos_config.locale import get_default_tz, get_default_lang from ovos_config import Configuration - - +from ovos_config.locale import get_default_tz, get_default_lang from ovos_skill_alerts.util import AlertType, DAVType, AlertPriority, Weekdays from ovos_skill_alerts.util.dav_utils import process_ical_event, process_ical_todo +from ovos_utils.log import LOG LOCAL_USER = "local" TZID = Configuration().get("location", {}).get("timezone", {}).get("code") or "UTC" def alert_time_in_range( - start: dt.datetime, - end: Optional[dt.datetime], - ref_start: dt.datetime, - ref_end: Optional[dt.datetime] + start: dt.datetime, + end: Optional[dt.datetime], + ref_start: dt.datetime, + ref_end: Optional[dt.datetime] ) -> bool: """ Checks if two alerts with given start and end datetimes @@ -67,7 +65,7 @@ def alert_time_in_range( return False if end is not None: return (start <= ref_start < end) \ - or (start <= ref_end < end if ref_end else False) + or (start <= ref_end < end if ref_end else False) # in this case it's the reverse question return ref_start <= start < ref_end @@ -98,8 +96,8 @@ def user(self) -> str: Return the user associated with this alert """ return self.context.get("username") \ - or self.context.get("user") \ - or LOCAL_USER + or self.context.get("user") \ + or LOCAL_USER @property def lang(self) -> str: @@ -115,22 +113,22 @@ def created(self) -> dt.datetime: """ dt_ = dt.datetime.fromtimestamp(self.context.get("created")) return dt_.astimezone(self.timezone).replace(microsecond=0) - + @property def stopwatch(self): """ Returns the time elapsed since the alert was created """ return self.now - self.created - + @property def stopwatch_mode(self): """ Returns the stopwatch mode """ return self.alert_type == AlertType.TIMER and \ - self.context.get("stopwatch_mode", False) - + self.context.get("stopwatch_mode", False) + @stopwatch_mode.setter def stopwatch_mode(self, mode: bool): """ @@ -145,7 +143,7 @@ def timezone(self) -> dt.tzinfo: """ expiration = self._data.get("next_expiration_time") return dt.datetime.fromisoformat(expiration).tzinfo if expiration \ - else get_default_tz() + else get_default_tz() @property def alert_type(self) -> AlertType: @@ -200,7 +198,7 @@ def priority(self) -> int: :returns: the alert priority (1-10) """ return self._data.get("priority") or AlertPriority.AVERAGE.value - + @priority.setter def priority(self, num: int): """ @@ -225,7 +223,7 @@ def alert_name(self) -> str: @property def now(self): return dt.datetime.now(tz=self.timezone) - + # Expiration @property def prenotification(self) -> Optional[dt.datetime]: @@ -237,7 +235,7 @@ def prenotification(self) -> Optional[dt.datetime]: notification = self.expiration + dt.timedelta( seconds=self._data.get("prenotification") ) - return notification if notification > self.now else None + return notification if notification > self.now else None @prenotification.setter def prenotification(self, time_: Union[dt.timedelta, dt.datetime, relativedelta, int]): @@ -270,8 +268,8 @@ def expiration(self) -> Optional[dt.datetime]: @expiration.setter def expiration(self, new_expiration: Union[dt.datetime, dt.date]): if new_expiration.__class__ == dt.date: - new_expiration = dt.datetime.combine(new_expiration, dt.time.min)\ - .replace(tzinfo=self.timezone) + new_expiration = dt.datetime.combine(new_expiration, dt.time.min) \ + .replace(tzinfo=self.timezone) self._data["next_expiration_time"] = new_expiration.isoformat() @property @@ -293,7 +291,7 @@ def is_expired(self) -> bool: return False now = dt.datetime.now(self.timezone) return now >= dt.datetime.fromisoformat(expiration) - + @property def is_all_day(self) -> bool: """ @@ -311,7 +309,7 @@ def is_all_day(self, all_day: bool): self._data["all_day"] = all_day if all_day: self._data["next_expiration_time"] = \ - (self.expiration.replace(hour=0, minute=0, second=0)).isoformat() + (self.expiration.replace(hour=0, minute=0, second=0)).isoformat() self.until = (self.until or self.expiration).replace(hour=23, minute=59, second=59) @property @@ -348,14 +346,14 @@ def repeat_frequency(self) -> Optional[dt.timedelta]: if self._data.get("repeat_frequency") else None ) - + @property def has_repeat(self) -> bool: """ :returns: Whether the alert has any repeat information. """ return any((self.repeat_days, self.repeat_frequency)) - + def reset_repeat(self): """ Resets the expiration time to the last expiration (or to the @@ -371,7 +369,7 @@ def reset_repeat(self): elif self.repeat_frequency: expiration = expiration - self.repeat_frequency self._data["next_expiration_time"] = expiration.isoformat() - + def remove_repeat(self) -> None: """ Purges repeat information @@ -387,12 +385,12 @@ def until(self) -> Optional[dt.datetime]: """ end = self._data.get("until") return dt.datetime.fromisoformat(end) if end else None - + @until.setter def until(self, end: Union[dt.datetime, dt.timedelta]) -> None: if self.expiration is None: raise Exception("You can't set an until without expiration") - + # relative to expiration if isinstance(end, dt.timedelta): self._data["until"] = (self.expiration + end).isoformat() @@ -400,7 +398,7 @@ def until(self, end: Union[dt.datetime, dt.timedelta]) -> None: self._data["until"] = end.isoformat() else: raise TypeError("until must be a datetime or timedelta object") - + # Media associated @property def audio_file(self) -> Optional[str]: @@ -408,7 +406,7 @@ def audio_file(self) -> Optional[str]: Returns the audio filename launched on expiration """ return self._data.get("audio_file") - + @property def ocp_request(self) -> str: """ @@ -422,7 +420,7 @@ def ocp_request(self, request: Union[dict, None]): if not isinstance(request, (dict, type(None))): raise TypeError("'request' is supposed to be of type dict") self._data["ocp"] = request - + @property def media_type(self): """ @@ -434,7 +432,7 @@ def media_type(self): elif self.audio_file: return "audio_file" return None - + @property def media_source(self): """ @@ -446,7 +444,7 @@ def media_source(self): elif self.audio_file: return self.audio_file return None - + # DAV properties @property def service(self) -> Optional[str]: @@ -572,8 +570,8 @@ def _get_next_expiration_time(self, skip=False) -> Optional[dt.datetime]: expiration += self.repeat_frequency elif self.repeat_days: while ( - expiration <= now - or Weekdays(expiration.weekday()) not in self.repeat_days + expiration <= now + or Weekdays(expiration.weekday()) not in self.repeat_days ): expiration += dt.timedelta(days=1) elif self.until is not None: @@ -589,7 +587,7 @@ def _get_next_expiration_time(self, skip=False) -> Optional[dt.datetime]: LOG.debug(f"New expiration set for {self.ident}({self.alert_name}): {expiration}") return expiration -# Constructors + # Constructors @staticmethod def from_dict(alert_data: dict): """ @@ -619,7 +617,7 @@ def to_ical(self) -> icalendar.Calendar: """ ical = icalendar.Calendar() expiration = self.expiration - + if self.dav_type == DAVType.VEVENT: component = icalendar.Event() alarm = icalendar.Alarm() @@ -670,20 +668,20 @@ def deserialize(alert_str: str): @staticmethod def create( - expiration: Union[dt.date, dt.datetime, str] = None, - prenotification: int = None, - alert_name: str = None, - alert_type: AlertType = AlertType.UNKNOWN, - dav_type: DAVType = DAVType.VEVENT, - priority: int = AlertPriority.AVERAGE.value, - repeat_frequency: Union[int, dt.timedelta] = None, - repeat_days: Set[Weekdays] = None, - until: Union[dt.datetime, str] = None, - audio_file: str = None, - dav_calendar: str = None, - dav_service: str = None, - context: dict = None, - lang: str = None + expiration: Union[dt.date, dt.datetime, str] = None, + prenotification: int = None, + alert_name: str = None, + alert_type: AlertType = AlertType.UNKNOWN, + dav_type: DAVType = DAVType.VEVENT, + priority: int = AlertPriority.AVERAGE.value, + repeat_frequency: Union[int, dt.timedelta] = None, + repeat_days: Set[Weekdays] = None, + until: Union[dt.datetime, str] = None, + audio_file: str = None, + dav_calendar: str = None, + dav_service: str = None, + context: dict = None, + lang: str = None ): """ Object representing an arbitrary alert @@ -707,8 +705,8 @@ def create( expiration = dt.datetime.fromisoformat(expiration) elif expiration.__class__ == dt.date: data["all_day"] = True - expiration = dt.datetime.combine(expiration, dt.time.min)\ - .replace(tzinfo=get_default_tz()) + expiration = dt.datetime.combine(expiration, dt.time.min) \ + .replace(tzinfo=get_default_tz()) if not expiration.tzinfo: raise ValueError("expiration missing tzinfo") # Round off any microseconds @@ -753,8 +751,7 @@ def create( context["ident"] = str(uuid4()) data.update({ - "next_expiration_time": expiration.isoformat() if expiration \ - else None, + "next_expiration_time": expiration.isoformat() if expiration else None, "prenotification": prenotification, "alert_type": alert_type.value, "dav_type": dav_type.value, @@ -782,7 +779,7 @@ def is_alert_type(alert: Alert, alert_type: AlertType) -> bool: """ if alert_type == AlertType.ALL: return True - + return alert.alert_type == alert_type.value @@ -811,4 +808,3 @@ def properties_changed(local: Alert, dav: Alert) -> bool: local.until != dav.until ] ) - \ No newline at end of file diff --git a/util/locale.py b/util/locale.py index cdd3418..082658f 100644 --- a/util/locale.py +++ b/util/locale.py @@ -165,7 +165,7 @@ def spoken_duration(alert_time: Union[dt.timedelta, dt.datetime], return nice_duration(int(_seconds), lang=lang) -def get_abbreviation(wd: Weekdays, lang = None) -> str: +def get_abbreviation(wd: Weekdays, lang=None) -> str: if wd == Weekdays.MON: return translate("abbreviation_monday", lang=lang) elif wd == Weekdays.TUE: diff --git a/util/parse_utils.py b/util/parse_utils.py index c332176..70c4c1b 100644 --- a/util/parse_utils.py +++ b/util/parse_utils.py @@ -33,6 +33,7 @@ from dateutil.relativedelta import relativedelta from ovos_bus_client.message import Message, dig_for_message +from ovos_bus_client.util import get_message_lang from ovos_config.locale import get_default_lang, get_default_tz from ovos_date_parser import nice_time, nice_day, extract_datetime, extract_duration from ovos_number_parser import extract_number @@ -49,7 +50,6 @@ from ovos_utils.log import LOG from ovos_utterance_normalizer import UtteranceNormalizerPlugin from rapidfuzz import fuzz -from ovos_bus_client.util import get_message_lang class Tokens(list): diff --git a/util/ui_models.py b/util/ui_models.py index 2a7e17b..ddf27dc 100644 --- a/util/ui_models.py +++ b/util/ui_models.py @@ -29,11 +29,10 @@ from datetime import datetime from ovos_date_parser import nice_duration, nice_time -from ovos_utils.log import LOG - from ovos_skill_alerts.util.alert import Alert, AlertType -from ovos_skill_alerts.util.locale import get_abbreviation, translate, datetime_display from ovos_skill_alerts.util.config import use_24h_format +from ovos_skill_alerts.util.locale import get_abbreviation, translate, datetime_display +from ovos_utils.log import LOG def build_gui_data(alert: Alert) -> dict: @@ -116,11 +115,10 @@ def build_alarm_data(alert: Alert) -> dict: def create_repeat_str(alert: Alert) -> str: - def get_sequences(d): seq = [[d[0]]] for i in range(1, len(d)): - if d[i-1]+1 == d[i]: + if d[i - 1] + 1 == d[i]: seq[-1].append(d[i]) else: seq.append([d[i]]) @@ -132,7 +130,7 @@ def get_sequences(d): for i, sequence in enumerate(sequences): first = get_abbreviation(min(sequence), lang=alert.lang) last = get_abbreviation(max(sequence), lang=alert.lang) - if len(sequence) > 2: + if len(sequence) > 2: sequences[i] = f"{first}-{last}" elif len(sequence) == 2: sequences[i] = f"{first},{last}" @@ -145,7 +143,7 @@ def get_sequences(d): if alert.until: if repeat_str: - repeat_str += "|" + repeat_str += "|" repeat_str += f"{translate('until', lang=alert.lang)} {datetime_display(alert.until.date(), lang=alert.lang)}" - + return repeat_str