diff --git a/resources/lib/youtube_plugin/youtube/client/youtube.py b/resources/lib/youtube_plugin/youtube/client/youtube.py index 0fbeeb732..a004e15b4 100644 --- a/resources/lib/youtube_plugin/youtube/client/youtube.py +++ b/resources/lib/youtube_plugin/youtube/client/youtube.py @@ -1653,14 +1653,13 @@ def _threaded_fetch(kwargs, else: threads['balance'].clear() - current_thread = threading.current_thread() - threads['available'].release() + threads['counter'].release() if complete: - threads['pool_counts'][pool_id] = None + threads['counts'][pool_id] = None else: - threads['pool_counts'][pool_id] -= 1 - threads['pool_counts']['all'] -= 1 - threads['current'].discard(current_thread) + threads['counts'][pool_id] -= 1 + threads['counts']['all'] -= 1 + threads['current'].discard(threading.current_thread()) threads['loop'].set() try: @@ -1668,16 +1667,21 @@ def _threaded_fetch(kwargs, except NotImplementedError: num_cores = 1 max_threads = min(32, 2 * (num_cores + 4)) + counts = { + 'all': 0, + } + current_threads = set() + counter = threading.Semaphore(max_threads) + balance_enable = threading.Event() + loop_enable = threading.Event() threads = { - 'max': max_threads, - 'available': threading.Semaphore(max_threads), - 'current': set(), - 'pool_counts': { - 'all': 0, - }, - 'balance': threading.Event(), - 'loop': threading.Event(), + 'balance': balance_enable, + 'loop': loop_enable, + 'counter': counter, + 'counts': counts, + 'current': current_threads, } + payloads = {} if logged_in: payloads[1] = { @@ -1712,13 +1716,13 @@ def _threaded_fetch(kwargs, completed = [] iterator = iter(payloads) - threads['loop'].set() - while threads['loop'].wait(): + loop_enable.set() + while loop_enable.wait(): try: pool_id = next(iterator) except StopIteration: - threads['loop'].clear() - if not threads['current']: + loop_enable.clear() + if not current_threads: break for pool_id in completed: del payloads[pool_id] @@ -1728,7 +1732,7 @@ def _threaded_fetch(kwargs, payload = payloads[pool_id] payload['pool_id'] = pool_id - current_num = threads['pool_counts'].setdefault(pool_id, 0) + current_num = counts.setdefault(pool_id, 0) if current_num is None: completed.append(pool_id) continue @@ -1743,13 +1747,13 @@ def _threaded_fetch(kwargs, completed.append(pool_id) continue - available = threads['max'] - threads['pool_counts']['all'] + available = max_threads - counts['all'] limit = payload['limit'] if limit: if current_num >= limit: continue if available <= 0: - threads['balance'].set() + balance_enable.set() elif available <= 0: continue @@ -1758,10 +1762,10 @@ def _threaded_fetch(kwargs, kwargs=payload, ) new_thread.daemon = True - threads['current'].add(new_thread) - threads['pool_counts'][pool_id] += 1 - threads['pool_counts']['all'] += 1 - threads['available'].acquire(True) + current_threads.add(new_thread) + counts[pool_id] += 1 + counts['all'] += 1 + counter.acquire(True) new_thread.start() items = _parse_feeds(threaded_output['feeds'])