diff --git a/src/core/widgets/yasb/update_check.py b/src/core/widgets/yasb/update_check.py index b668f13..09ae9a1 100644 --- a/src/core/widgets/yasb/update_check.py +++ b/src/core/widgets/yasb/update_check.py @@ -8,14 +8,12 @@ from PyQt6.QtWidgets import QLabel, QHBoxLayout, QWidget from PyQt6.QtCore import Qt, QThread, pyqtSignal import win32com.client -import subprocess -import logging class UpdateWorker(QThread): windows_update_signal = pyqtSignal(dict) winget_update_signal = pyqtSignal(dict) - def __init__(self, update_type,exclude_list=None, parent=None): + def __init__(self, update_type, exclude_list=None, parent=None): super().__init__(parent) self.update_type = update_type self.running = True @@ -24,13 +22,12 @@ def __init__(self, update_type,exclude_list=None, parent=None): def filter_updates(self, updates, names): if not self.exclude_list: return len(updates), names - + valid_excludes = [x.lower() for x in self.exclude_list if x and x.strip()] filtered_names = [] filtered_count = 0 for name in names: - # Only exclude if matches a non-empty pattern if not any(excluded in name.lower() for excluded in valid_excludes): filtered_names.append(name) filtered_count += 1 @@ -63,9 +60,14 @@ def run(self): lines = result.stdout.strip().split('\n') fl = 0 - while not lines[fl].startswith("Name"): + while fl < len(lines) and not lines[fl].startswith("Name"): fl += 1 - + + if fl >= len(lines): + logging.error("Invalid winget output format.") + self.winget_update_signal.emit({"count": 0, "names": []}) + return + id_start = lines[fl].index("Id") version_start = lines[fl].index("Version") available_start = lines[fl].index("Available") @@ -105,6 +107,77 @@ def run(self): else: self.winget_update_signal.emit({"count": 0, "names": []}) +class UpdateManager: + _instance = None + _lock = threading.Lock() + + def __new__(cls): + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._workers = {} + cls._instance._subscribers = [] + return cls._instance + + def register_subscriber(self, callback): + if callback not in self._subscribers: + self._subscribers.append(callback) + + def unregister_subscriber(self, callback): + if callback in self._subscribers: + self._subscribers.remove(callback) + + def notify_subscribers(self, event_type, data): + for subscriber in self._subscribers: + subscriber(event_type, data) + + def start_worker(self, update_type, exclude_list=None): + if update_type not in self._workers: + worker = UpdateWorker(update_type, exclude_list) + if update_type == 'windows': + worker.windows_update_signal.connect( + lambda x: self.notify_subscribers('windows_update', x) + ) + else: + worker.winget_update_signal.connect( + lambda x: self.notify_subscribers('winget_update', x) + ) + self._workers[update_type] = worker + worker.start() + + def stop_all(self): + for worker in self._workers.values(): + worker.stop() + self._workers.clear() + + def handle_left_click(self, label_type): + if label_type == 'windows': + subprocess.Popen('start ms-settings:windowsupdate', shell=True) + elif label_type == 'winget': + powershell_path = shutil.which('pwsh') or shutil.which('powershell') or 'powershell.exe' + command = f'start "Winget Upgrade" "{powershell_path}" -NoExit -Command "winget upgrade --all"' + subprocess.Popen(command, shell=True) + # Notify all subscribers to hide the container + self.notify_subscribers(f'{label_type}_hide', {}) + + def handle_right_click(self, label_type): + # Stop existing worker if running + if label_type in self._workers: + self._workers[label_type].stop() + del self._workers[label_type] + # Get correct exclude list based on type + exclude_list = [] + for subscriber in self._subscribers: + if label_type == 'windows' and hasattr(subscriber, '_windows_update_exclude'): + exclude_list.extend(subscriber._windows_update_exclude) + elif label_type == 'winget' and hasattr(subscriber, '_winget_update_exclude'): + exclude_list.extend(subscriber._winget_update_exclude) + + # Hide the container first + self.notify_subscribers(f'{label_type}_hide', {}) + # Start new worker + self.start_worker(label_type, exclude_list) + class UpdateCheckWidget(BaseWidget): validation_schema = VALIDATION_SCHEMA @@ -116,50 +189,29 @@ def __init__(self, windows_update: dict[str, str], winget_update: dict[str, str] self._windows_update = windows_update self._winget_update = winget_update - self._window_update_enabled = self._windows_update['enabled'] - self._windows_update_label = self._windows_update['label'] - self._window_update_interval = int(self._windows_update['interval'] * 60) - self._windows_update_exclude = self._windows_update['exclude'] + self._window_update_enabled = self._windows_update.get('enabled', False) + self._windows_update_label = self._windows_update.get('label', '') + self._windows_update_exclude = self._windows_update.get('exclude', []) - self._winget_update_enabled = self._winget_update['enabled'] - self._winget_update_label = self._winget_update['label'] - self._winget_update_interval = int(self._winget_update['interval'] * 60) - self._winget_update_exclude = self._winget_update['exclude'] + self._winget_update_enabled = self._winget_update.get('enabled', False) + self._winget_update_label = self._winget_update.get('label', '') + self._winget_update_exclude = self._winget_update.get('exclude', []) self.windows_update_data = 0 self.winget_update_data = 0 self._create_dynamically_label(self._winget_update_label, self._windows_update_label) - self._stop_event = threading.Event() - - self.windows_worker = None - self.winget_worker = None + self._update_manager = UpdateManager() + self._update_manager.register_subscriber(self.emit_event) if self._window_update_enabled: - self.start_windows_update_timer() + self._update_manager.start_worker('windows', self._windows_update_exclude) if self._winget_update_enabled: - self.start_winget_update_timer() + self._update_manager.start_worker('winget', self._winget_update_exclude) self.update_widget_visibility() - - def start_windows_update_timer(self): - self.windows_worker = UpdateWorker('windows', self._windows_update_exclude) - self.windows_worker.windows_update_signal.connect( - lambda x: self.emit_event('windows_update', x) - ) - self.windows_worker.start() - - - def start_winget_update_timer(self): - self.winget_worker = UpdateWorker('winget', self._winget_update_exclude) - self.winget_worker.winget_update_signal.connect( - lambda x: self.emit_event('winget_update', x) - ) - self.winget_worker.start() - - def emit_event(self, event_type, update_info): if event_type == 'windows_update': self.windows_update_data = update_info['count'] @@ -167,9 +219,12 @@ def emit_event(self, event_type, update_info): elif event_type == 'winget_update': self.winget_update_data = update_info['count'] self._update_label('winget', update_info['count'], update_info['names']) + elif event_type == 'windows_hide': + self.hide_container('windows') + elif event_type == 'winget_hide': + self.hide_container('winget') self.update_widget_visibility() - def _create_dynamically_label(self, windows_label: str, winget_label: str): def process_content(label_text, label_type): container = QWidget() @@ -190,8 +245,8 @@ def process_content(label_text, label_type): if not part: continue if '' in part: - class_name = re.search(r'class=(["\'])([^"\']+?)\1', part) - class_result = class_name.group(2) if class_name else 'icon' + class_match = re.search(r'class=(["\'])([^"\']+?)\1', part) + class_result = class_match.group(2) if class_match else 'icon' icon = re.sub(r'|', '', part).strip() label = QLabel(icon) label.setProperty("class", class_result) @@ -210,7 +265,6 @@ def process_content(label_text, label_type): if self._window_update_enabled: self._windows_container, self._widget_windows = process_content(self._windows_update_label, "windows") - def _update_label(self, widget_type, data, names): print(names) if widget_type == 'winget': @@ -240,47 +294,20 @@ def _update_label(self, widget_type, data, names): else: formatted_text = part.format(count=data) active_widgets[widget_index].setText(formatted_text) + active_widgets[widget_index].setCursor(Qt.CursorShape.PointingHandCursor) active_widgets[widget_index].setToolTip("\n".join(names)) active_widgets[widget_index].setStyleSheet(self.TOOLTIP_STYLE) widget_index += 1 - def reload_widget(self, widget_type, event=None): - self.hide_container(widget_type) - if widget_type == 'windows': - if self.windows_worker: - self.windows_worker.stop() - self.start_windows_update_timer() - elif widget_type == 'winget': - if self.winget_worker: - self.winget_worker.stop() - self.start_winget_update_timer() - - - def open_console(self, event=None): - powershell_path = shutil.which('pwsh') or shutil.which('powershell') or 'powershell.exe' - command = f'start "Winget Upgrade" "{powershell_path}" -NoExit -Command "winget upgrade --all"' - subprocess.Popen(command, shell=True) - self.hide_container('winget') - - - def open_windows_update(self, event=None): - subprocess.Popen('start ms-settings:windowsupdate', shell=True) - self.hide_container('windows') - - def handle_mouse_events(self, label_type): def event_handler(event): if event.button() == Qt.MouseButton.LeftButton: - if label_type == 'windows': - self.open_windows_update(event) - elif label_type == 'winget': - self.open_console(event) + self._update_manager.handle_left_click(label_type) elif event.button() == Qt.MouseButton.RightButton: - self.reload_widget(label_type, event) + self._update_manager.handle_right_click(label_type) return event_handler - def hide_container(self, container): if container == 'windows': self.windows_update_data = 0 @@ -290,15 +317,8 @@ def hide_container(self, container): self._update_label('winget', 0, []) self.update_widget_visibility() - def update_widget_visibility(self): if self.windows_update_data == 0 and self.winget_update_data == 0: self.hide() else: - self.show() - - def stop_updates(self): - if self.windows_worker: - self.windows_worker.stop() - if self.winget_worker: - self.winget_worker.stop() \ No newline at end of file + self.show() \ No newline at end of file