Skip to content

Commit

Permalink
Tidy up My Subscription thread loop variables
Browse files Browse the repository at this point in the history
  • Loading branch information
MoojMidge committed Aug 25, 2024
1 parent 5af663d commit d2b44de
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions resources/lib/youtube_plugin/youtube/client/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1653,31 +1653,35 @@ def _threaded_fetch(kwargs,
else:
threads['balance'].clear()

current_thread = threading.current_thread()
threads['available'].release()
threads['counter'].release()
if complete:
threads['pool_counts'][pool_id] = None
threads['counts'][pool_id] = None
else:
threads['pool_counts'][pool_id] -= 1
threads['pool_counts']['all'] -= 1
threads['current'].discard(current_thread)
threads['counts'][pool_id] -= 1
threads['counts']['all'] -= 1
threads['current'].discard(threading.current_thread())
threads['loop'].set()

try:
num_cores = cpu_count() or 1
except NotImplementedError:
num_cores = 1
max_threads = min(32, 2 * (num_cores + 4))
counts = {
'all': 0,
}
current_threads = set()
counter = threading.Semaphore(max_threads)
balance_enable = threading.Event()
loop_enable = threading.Event()
threads = {
'max': max_threads,
'available': threading.Semaphore(max_threads),
'current': set(),
'pool_counts': {
'all': 0,
},
'balance': threading.Event(),
'loop': threading.Event(),
'balance': balance_enable,
'loop': loop_enable,
'counter': counter,
'counts': counts,
'current': current_threads,
}

payloads = {}
if logged_in:
payloads[1] = {
Expand Down Expand Up @@ -1712,13 +1716,13 @@ def _threaded_fetch(kwargs,

completed = []
iterator = iter(payloads)
threads['loop'].set()
while threads['loop'].wait():
loop_enable.set()
while loop_enable.wait():
try:
pool_id = next(iterator)
except StopIteration:
threads['loop'].clear()
if not threads['current']:
loop_enable.clear()
if not current_threads:
break
for pool_id in completed:
del payloads[pool_id]
Expand All @@ -1728,7 +1732,7 @@ def _threaded_fetch(kwargs,

payload = payloads[pool_id]
payload['pool_id'] = pool_id
current_num = threads['pool_counts'].setdefault(pool_id, 0)
current_num = counts.setdefault(pool_id, 0)
if current_num is None:
completed.append(pool_id)
continue
Expand All @@ -1743,13 +1747,13 @@ def _threaded_fetch(kwargs,
completed.append(pool_id)
continue

available = threads['max'] - threads['pool_counts']['all']
available = max_threads - counts['all']
limit = payload['limit']
if limit:
if current_num >= limit:
continue
if available <= 0:
threads['balance'].set()
balance_enable.set()
elif available <= 0:
continue

Expand All @@ -1758,10 +1762,10 @@ def _threaded_fetch(kwargs,
kwargs=payload,
)
new_thread.daemon = True
threads['current'].add(new_thread)
threads['pool_counts'][pool_id] += 1
threads['pool_counts']['all'] += 1
threads['available'].acquire(True)
current_threads.add(new_thread)
counts[pool_id] += 1
counts['all'] += 1
counter.acquire(True)
new_thread.start()

items = _parse_feeds(threaded_output['feeds'])
Expand Down

0 comments on commit d2b44de

Please sign in to comment.