diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index ed55a84..d7de986 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -1,5 +1,8 @@ +import pytest + from promise import Promise from promise.dataloader import DataLoader +from promise.utils import PY2 import threading @@ -113,3 +116,97 @@ def do(): assert assert_object['is_same_thread_1'] assert assert_object['is_same_thread_2'] + + +@pytest.mark.skipif(PY2, reason='python2 does not support setswitchinterval') +@pytest.mark.parametrize('num_threads', [1]) +@pytest.mark.parametrize('count', [10000]) +def test_with_process_loop(num_threads, count): + """ + Start a Promise in one thread, but resolve it in another. + """ + import queue + from random import randint + from threading import Thread, Barrier + from traceback import print_exc, format_exc + from sys import setswitchinterval + + test_with_process_loop._force_stop = False + items = queue.Queue() + barrier = Barrier(num_threads) + + asserts = [] + timeouts = [] + + def event_loop(): + stop_count = num_threads + while True: + item = items.get() + if item[0] == 'STOP': + stop_count -= 1 + if stop_count == 0: + break + if item[0] == 'ABORT': + break + if item[0] == 'ITEM': + (_, resolve, reject, i) = item + random_integer = randint(0, 100) + # 25% chances per each + if 0 <= random_integer < 25: + # nested rejected promise + resolve(Promise.rejected(ZeroDivisionError(i))) + elif 25 <= random_integer < 50: + # nested resolved promise + resolve(Promise.resolve(i)) + elif 50 <= random_integer < 75: + # plain resolve + resolve(i) + else: + # plain reject + reject(ZeroDivisionError(i)) + + def worker(): + barrier.wait() + # Force fast switching of threads, this is NOT used in real world case. However without this + # I was unable to reproduce the issue. + setswitchinterval(0.000001) + for i in range(0, count): + if test_with_process_loop._force_stop: + break + + def do(resolve, reject): + items.put(('ITEM', resolve, reject, i)) + + p = Promise(do) + try: + p.get(timeout=1) + except ZeroDivisionError: + pass + except AssertionError as e: + print("ASSERT", e) + print_exc() + test_with_process_loop._force_stop = True + items.put(('ABORT',)) + asserts.append(format_exc()) + except Exception as e: + print("Timeout", e) + print_exc() + test_with_process_loop._force_stop = True + items.put(('ABORT',)) + timeouts.append(format_exc()) + + items.put(('STOP',)) + + loop_thread = Thread(target=event_loop) + loop_thread.start() + + worker_threads = [Thread(target=worker) for i in range(0, num_threads)] + for t in worker_threads: + t.start() + + loop_thread.join() + for t in worker_threads: + t.join() + + assert asserts == [] + assert timeouts == []