Skip to content

Commit

Permalink
feat(update): refactor update handling with UpdateManager for better …
Browse files Browse the repository at this point in the history
…management and subscriber notifications
  • Loading branch information
amnweb committed Nov 27, 2024
1 parent 9c0c287 commit c874c11
Showing 1 changed file with 100 additions and 80 deletions.
180 changes: 100 additions & 80 deletions src/core/widgets/yasb/update_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
from PyQt6.QtWidgets import QLabel, QHBoxLayout, QWidget
from PyQt6.QtCore import Qt, QThread, pyqtSignal
import win32com.client
import subprocess
import logging

class UpdateWorker(QThread):
windows_update_signal = pyqtSignal(dict)
winget_update_signal = pyqtSignal(dict)

def __init__(self, update_type,exclude_list=None, parent=None):
def __init__(self, update_type, exclude_list=None, parent=None):
super().__init__(parent)
self.update_type = update_type
self.running = True
Expand All @@ -24,13 +22,12 @@ def __init__(self, update_type,exclude_list=None, parent=None):
def filter_updates(self, updates, names):
if not self.exclude_list:
return len(updates), names

valid_excludes = [x.lower() for x in self.exclude_list if x and x.strip()]
filtered_names = []
filtered_count = 0

for name in names:
# Only exclude if matches a non-empty pattern
if not any(excluded in name.lower() for excluded in valid_excludes):
filtered_names.append(name)
filtered_count += 1
Expand Down Expand Up @@ -63,9 +60,14 @@ def run(self):

lines = result.stdout.strip().split('\n')
fl = 0
while not lines[fl].startswith("Name"):
while fl < len(lines) and not lines[fl].startswith("Name"):
fl += 1


if fl >= len(lines):
logging.error("Invalid winget output format.")
self.winget_update_signal.emit({"count": 0, "names": []})
return

id_start = lines[fl].index("Id")
version_start = lines[fl].index("Version")
available_start = lines[fl].index("Available")
Expand Down Expand Up @@ -105,6 +107,77 @@ def run(self):
else:
self.winget_update_signal.emit({"count": 0, "names": []})

class UpdateManager:
_instance = None
_lock = threading.Lock()

def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._workers = {}
cls._instance._subscribers = []
return cls._instance

def register_subscriber(self, callback):
if callback not in self._subscribers:
self._subscribers.append(callback)

def unregister_subscriber(self, callback):
if callback in self._subscribers:
self._subscribers.remove(callback)

def notify_subscribers(self, event_type, data):
for subscriber in self._subscribers:
subscriber(event_type, data)

def start_worker(self, update_type, exclude_list=None):
if update_type not in self._workers:
worker = UpdateWorker(update_type, exclude_list)
if update_type == 'windows':
worker.windows_update_signal.connect(
lambda x: self.notify_subscribers('windows_update', x)
)
else:
worker.winget_update_signal.connect(
lambda x: self.notify_subscribers('winget_update', x)
)
self._workers[update_type] = worker
worker.start()

def stop_all(self):
for worker in self._workers.values():
worker.stop()
self._workers.clear()

def handle_left_click(self, label_type):
if label_type == 'windows':
subprocess.Popen('start ms-settings:windowsupdate', shell=True)
elif label_type == 'winget':
powershell_path = shutil.which('pwsh') or shutil.which('powershell') or 'powershell.exe'
command = f'start "Winget Upgrade" "{powershell_path}" -NoExit -Command "winget upgrade --all"'
subprocess.Popen(command, shell=True)
# Notify all subscribers to hide the container
self.notify_subscribers(f'{label_type}_hide', {})

def handle_right_click(self, label_type):
# Stop existing worker if running
if label_type in self._workers:
self._workers[label_type].stop()
del self._workers[label_type]
# Get correct exclude list based on type
exclude_list = []
for subscriber in self._subscribers:
if label_type == 'windows' and hasattr(subscriber, '_windows_update_exclude'):
exclude_list.extend(subscriber._windows_update_exclude)
elif label_type == 'winget' and hasattr(subscriber, '_winget_update_exclude'):
exclude_list.extend(subscriber._winget_update_exclude)

# Hide the container first
self.notify_subscribers(f'{label_type}_hide', {})
# Start new worker
self.start_worker(label_type, exclude_list)

class UpdateCheckWidget(BaseWidget):
validation_schema = VALIDATION_SCHEMA

Expand All @@ -116,60 +189,42 @@ def __init__(self, windows_update: dict[str, str], winget_update: dict[str, str]
self._windows_update = windows_update
self._winget_update = winget_update

self._window_update_enabled = self._windows_update['enabled']
self._windows_update_label = self._windows_update['label']
self._window_update_interval = int(self._windows_update['interval'] * 60)
self._windows_update_exclude = self._windows_update['exclude']
self._window_update_enabled = self._windows_update.get('enabled', False)
self._windows_update_label = self._windows_update.get('label', '')
self._windows_update_exclude = self._windows_update.get('exclude', [])

self._winget_update_enabled = self._winget_update['enabled']
self._winget_update_label = self._winget_update['label']
self._winget_update_interval = int(self._winget_update['interval'] * 60)
self._winget_update_exclude = self._winget_update['exclude']
self._winget_update_enabled = self._winget_update.get('enabled', False)
self._winget_update_label = self._winget_update.get('label', '')
self._winget_update_exclude = self._winget_update.get('exclude', [])

self.windows_update_data = 0
self.winget_update_data = 0

self._create_dynamically_label(self._winget_update_label, self._windows_update_label)

self._stop_event = threading.Event()

self.windows_worker = None
self.winget_worker = None
self._update_manager = UpdateManager()
self._update_manager.register_subscriber(self.emit_event)

if self._window_update_enabled:
self.start_windows_update_timer()
self._update_manager.start_worker('windows', self._windows_update_exclude)
if self._winget_update_enabled:
self.start_winget_update_timer()
self._update_manager.start_worker('winget', self._winget_update_exclude)

self.update_widget_visibility()


def start_windows_update_timer(self):
self.windows_worker = UpdateWorker('windows', self._windows_update_exclude)
self.windows_worker.windows_update_signal.connect(
lambda x: self.emit_event('windows_update', x)
)
self.windows_worker.start()


def start_winget_update_timer(self):
self.winget_worker = UpdateWorker('winget', self._winget_update_exclude)
self.winget_worker.winget_update_signal.connect(
lambda x: self.emit_event('winget_update', x)
)
self.winget_worker.start()


def emit_event(self, event_type, update_info):
if event_type == 'windows_update':
self.windows_update_data = update_info['count']
self._update_label('windows', update_info['count'], update_info['names'])
elif event_type == 'winget_update':
self.winget_update_data = update_info['count']
self._update_label('winget', update_info['count'], update_info['names'])
elif event_type == 'windows_hide':
self.hide_container('windows')
elif event_type == 'winget_hide':
self.hide_container('winget')
self.update_widget_visibility()


def _create_dynamically_label(self, windows_label: str, winget_label: str):
def process_content(label_text, label_type):
container = QWidget()
Expand All @@ -190,8 +245,8 @@ def process_content(label_text, label_type):
if not part:
continue
if '<span' in part and '</span>' in part:
class_name = re.search(r'class=(["\'])([^"\']+?)\1', part)
class_result = class_name.group(2) if class_name else 'icon'
class_match = re.search(r'class=(["\'])([^"\']+?)\1', part)
class_result = class_match.group(2) if class_match else 'icon'
icon = re.sub(r'<span.*?>|</span>', '', part).strip()
label = QLabel(icon)
label.setProperty("class", class_result)
Expand All @@ -210,7 +265,6 @@ def process_content(label_text, label_type):
if self._window_update_enabled:
self._windows_container, self._widget_windows = process_content(self._windows_update_label, "windows")


def _update_label(self, widget_type, data, names):
print(names)
if widget_type == 'winget':
Expand Down Expand Up @@ -240,47 +294,20 @@ def _update_label(self, widget_type, data, names):
else:
formatted_text = part.format(count=data)
active_widgets[widget_index].setText(formatted_text)
active_widgets[widget_index].setCursor(Qt.CursorShape.PointingHandCursor)
active_widgets[widget_index].setToolTip("\n".join(names))
active_widgets[widget_index].setStyleSheet(self.TOOLTIP_STYLE)
widget_index += 1


def reload_widget(self, widget_type, event=None):
self.hide_container(widget_type)
if widget_type == 'windows':
if self.windows_worker:
self.windows_worker.stop()
self.start_windows_update_timer()
elif widget_type == 'winget':
if self.winget_worker:
self.winget_worker.stop()
self.start_winget_update_timer()


def open_console(self, event=None):
powershell_path = shutil.which('pwsh') or shutil.which('powershell') or 'powershell.exe'
command = f'start "Winget Upgrade" "{powershell_path}" -NoExit -Command "winget upgrade --all"'
subprocess.Popen(command, shell=True)
self.hide_container('winget')


def open_windows_update(self, event=None):
subprocess.Popen('start ms-settings:windowsupdate', shell=True)
self.hide_container('windows')


def handle_mouse_events(self, label_type):
def event_handler(event):
if event.button() == Qt.MouseButton.LeftButton:
if label_type == 'windows':
self.open_windows_update(event)
elif label_type == 'winget':
self.open_console(event)
self._update_manager.handle_left_click(label_type)
elif event.button() == Qt.MouseButton.RightButton:
self.reload_widget(label_type, event)
self._update_manager.handle_right_click(label_type)
return event_handler


def hide_container(self, container):
if container == 'windows':
self.windows_update_data = 0
Expand All @@ -290,15 +317,8 @@ def hide_container(self, container):
self._update_label('winget', 0, [])
self.update_widget_visibility()


def update_widget_visibility(self):
if self.windows_update_data == 0 and self.winget_update_data == 0:
self.hide()
else:
self.show()

def stop_updates(self):
if self.windows_worker:
self.windows_worker.stop()
if self.winget_worker:
self.winget_worker.stop()
self.show()

0 comments on commit c874c11

Please sign in to comment.