diff --git a/ably/executer/decorator.py b/ably/executer/decorator.py index 515e3fa2..5f3a80f6 100644 --- a/ably/executer/decorator.py +++ b/ably/executer/decorator.py @@ -57,7 +57,7 @@ def wrapper(self, *args, **kwargs): caller_eventloop: events = asyncio.get_running_loop() except Exception: pass - app_loop: events = AppEventLoop.get_global().loop + app_loop: events = self.app_loop.loop res = fn(self, *args, **kwargs) if asyncio.iscoroutine(res): @@ -85,15 +85,15 @@ def run_safe_async(fn): import asyncio @functools.wraps(fn) - def wrapper(*args, **kwargs): + def wrapper(self, *args, **kwargs): caller_eventloop = None try: caller_eventloop: events = asyncio.get_running_loop() except Exception: pass - app_loop: events = AppEventLoop.get_global().loop + app_loop: events = self.app_loop.loop - res = fn(*args, **kwargs) + res = fn(self, *args, **kwargs) if asyncio.iscoroutine(res): # Handle calls from app eventloop on the same loop, return awaitable if caller_eventloop is not None and caller_eventloop == app_loop: @@ -115,19 +115,8 @@ def close_app_eventloop(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): - app_loop = AppEventLoop.get_global() - # Handle result of the given async method, with blocking behaviour - caller_eventloop = None - try: - caller_eventloop: events = asyncio.get_running_loop() - except Exception: - pass + app_loop: events = self.app_loop - # Handle calls from app eventloop on the same loop, return awaitable - if caller_eventloop is not None and caller_eventloop == app_loop.loop: - return fn(self, *args, **kwargs) - - # Return awaitable as is! if not self.sync_enabled: app_loop.close() return fn(self, *args, **kwargs) diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index fefb0d71..65dd3846 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -6,32 +6,36 @@ class AppEventLoop: _global: 'AppEventLoop' = None - loop: events - thread: threading - is_active: bool + __loop: events + __thread: threading + __is_active: bool - def __init__(self): - self.loop = None - self.thread = None - self.is_active = False + def __init__(self, loop=None, thread=None): + if not loop.is_running: + raise Exception("Provided eventloop must be in running state") + self.__loop = loop + self.__thread = thread + self.__is_active = True @staticmethod def get_global() -> 'AppEventLoop': - if AppEventLoop._global is None or not AppEventLoop._global.is_active: + if AppEventLoop._global is None or not AppEventLoop._global.__is_active: AppEventLoop._global = AppEventLoop.create(True) return AppEventLoop._global @staticmethod def create(background=False) -> 'AppEventLoop': - app_loop = AppEventLoop() - app_loop.loop = asyncio.new_event_loop() - app_loop.thread = threading.Thread(target=app_loop.loop.run_forever, daemon=background) - app_loop.thread.start() - app_loop.is_active = True - return app_loop + loop = asyncio.new_event_loop() + thread = threading.Thread(target=loop.run_forever, daemon=background) + thread.start() + return AppEventLoop(loop, thread) + + @property + def loop(self): + return self.__loop def run(self, coro, callback): - self.loop.call_soon() + self.__loop.call_soon() def run_sync(self, coro): # todo - can only handle run_sync from different thread than app_loop @@ -45,21 +49,16 @@ def run_sync(self, coro): # task = self.loop.create_task(coro) # task.add_done_callback - future = asyncio.run_coroutine_threadsafe(coro, self.loop) + future = asyncio.run_coroutine_threadsafe(coro, self.__loop) return future.result() def run_async(self, coro): - future = asyncio.run_coroutine_threadsafe(coro, self.loop) + future = asyncio.run_coroutine_threadsafe(coro, self.__loop) return asyncio.wrap_future(future) - def _close(self): - if self.loop is not None and self.loop.is_running(): - self.loop.call_soon_threadsafe(self.loop.stop) - if self.thread is not None: - self.thread.join() - self.is_active = False - def close(self) -> events: - if self == AppEventLoop._global: # Global eventloop is shared amongst multiple client instances. - return # It will be reused, and closed automatically once process ends. - self._close() + if self.__thread is not None: + if self.__loop is not None and self.__loop.is_running(): + self.__loop.call_soon_threadsafe(self.__loop.stop) + self.__thread.join() + self.__is_active = False diff --git a/ably/executer/eventloop_helper.py b/ably/executer/eventloop_helper.py index 090c82ca..e1b82382 100644 --- a/ably/executer/eventloop_helper.py +++ b/ably/executer/eventloop_helper.py @@ -1,20 +1,19 @@ -from asyncio import events - - -def run(loop: events, coro): - print("run") - -def run_safe(loop: events, coro): - print("run safe") - -def run_sync(loop: events, coro): - print("run sync") - -def run_sync_safe(loop: events, coro): - print("run sync safe") - -def run_async(loop: events, coro): - print("run async") - -def run_async_safe(loop: events, coro): - print("run async safe") +# from asyncio import events +# +# def run(loop: events, coro, callback): +# print("run") +# +# def run_safe(loop: events, coro, callback): +# print("run safe") +# +# def run_sync(loop: events, coro): +# print("run sync") +# +# def run_sync_safe(loop: events, coro): +# print("run sync safe") +# +# async def run_async(loop: events, coro): +# print("run async") +# +# async def run_async_safe(loop: events, coro): +# print("run async safe") diff --git a/ably/http/http.py b/ably/http/http.py index 71a4bedb..7e38d06c 100644 --- a/ably/http/http.py +++ b/ably/http/http.py @@ -265,6 +265,10 @@ async def put(self, url, headers=None, body=None, skip_auth=False, timeout=None) def sync_enabled(self): return self.__ably.sync_enabled + @property + def app_loop(self): + return self.__ably.app_loop + @property def auth(self): return self.__auth diff --git a/ably/http/paginatedresult.py b/ably/http/paginatedresult.py index 9615b5aa..3c849c2d 100644 --- a/ably/http/paginatedresult.py +++ b/ably/http/paginatedresult.py @@ -78,6 +78,10 @@ async def next(self): def sync_enabled(self): return self.__http.sync_enabled + @property + def app_loop(self): + return self.__http.app_loop + async def __get_rel(self, rel_req): if rel_req is None: return None diff --git a/ably/rest/auth.py b/ably/rest/auth.py index 1719c037..b200893c 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -309,6 +309,10 @@ async def create_token_request(self, token_params: Optional[dict] = None, key_na def sync_enabled(self): return self.ably.sync_enabled + @property + def app_loop(self): + return self.ably.app_loop + @property def ably(self): return self.__ably diff --git a/ably/rest/channel.py b/ably/rest/channel.py index 777a3f42..cc8f33e1 100644 --- a/ably/rest/channel.py +++ b/ably/rest/channel.py @@ -147,6 +147,10 @@ async def status(self): def sync_enabled(self): return self.ably.sync_enabled + @property + def app_loop(self): + return self.ably.app_loop + @property def ably(self): return self.__ably diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 6e889130..f59a5c1a 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -64,12 +64,18 @@ def __init__(self, key: Optional[str] = None, token: Optional[str] = None, else: options = Options(**kwargs) - self.__loop = options.loop + self.__sync_enabled = options.sync_enabled + if self.__sync_enabled: + if options.loop is None: + self.__app_loop = AppEventLoop.create() + else: + self.__app_loop = AppEventLoop(loop=options.loop) + try: self._is_realtime except AttributeError: self._is_realtime = False - self.__sync_enabled = False + self.__http = Http(self, options) self.__auth = Auth(self, options) self.__http.auth = self.__auth @@ -120,14 +126,7 @@ def sync_enabled(self): @property def app_loop(self): - return self.__loop - - @sync_enabled.setter - def sync_enabled(self, enable_sync): - self.__sync_enabled = enable_sync - if enable_sync: - if self.__loop is None: - self.__loop = AppEventLoop.get_global() + return self.__app_loop @property def http(self): diff --git a/ably/types/options.py b/ably/types/options.py index abfe41c6..ca5e571a 100644 --- a/ably/types/options.py +++ b/ably/types/options.py @@ -14,7 +14,8 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None, fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None, loop=None, auto_connect=True, suspended_retry_timeout=None, connectivity_check_url=None, - channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, **kwargs): + channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, sync_enabled=False, + **kwargs): super().__init__(**kwargs) @@ -70,6 +71,7 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti self.__disconnected_retry_timeout = disconnected_retry_timeout self.__channel_retry_timeout = channel_retry_timeout self.__idempotent_rest_publishing = idempotent_rest_publishing + self.__sync_enabled = sync_enabled self.__loop = loop self.__auto_connect = auto_connect self.__connection_state_ttl = connection_state_ttl @@ -222,6 +224,10 @@ def channel_retry_timeout(self): def idempotent_rest_publishing(self): return self.__idempotent_rest_publishing + @property + def sync_enabled(self): + return self.__sync_enabled + @property def loop(self): return self.__loop diff --git a/ably/types/presence.py b/ably/types/presence.py index 56b453ea..457088f5 100644 --- a/ably/types/presence.py +++ b/ably/types/presence.py @@ -173,6 +173,10 @@ async def history(self, limit=None, direction=None, start=None, end=None): def sync_enabled(self): return self.__http.sync_enabled + @property + def app_loop(self): + return self.__http.app_loop + def make_presence_response_handler(cipher): def encrypted_presence_response_handler(response): diff --git a/test/ably/sync/sync_restauth_test.py b/test/ably/sync/sync_restauth_test.py index c8b3f278..26d15a6f 100644 --- a/test/ably/sync/sync_restauth_test.py +++ b/test/ably/sync/sync_restauth_test.py @@ -81,8 +81,7 @@ def test_auth_init_with_token(self): # RSA11 def test_request_basic_auth_header(self): - ably = AblyRest(key_secret='foo', key_name='bar') - ably.sync_enabled = True + ably = AblyRest(key_secret='foo', key_name='bar', sync_enabled=True) with mock.patch.object(AsyncClient, 'send') as get_mock: try: ably.http.get('/time', skip_auth=False) @@ -91,11 +90,10 @@ def test_request_basic_auth_header(self): request = get_mock.call_args_list[0][0][0] authorization = request.headers['Authorization'] assert authorization == 'Basic %s' % base64.b64encode('bar:foo'.encode('ascii')).decode('utf-8') - + ably.close() # RSA7e2 def test_request_basic_auth_header_with_client_id(self): - ably = AblyRest(key_secret='foo', key_name='bar', client_id='client_id') - ably.sync_enabled = True + ably = AblyRest(key_secret='foo', key_name='bar', client_id='client_id', sync_enabled=True) with mock.patch.object(AsyncClient, 'send') as get_mock: try: ably.http.get('/time', skip_auth=False) @@ -104,10 +102,10 @@ def test_request_basic_auth_header_with_client_id(self): request = get_mock.call_args_list[0][0][0] client_id = request.headers['x-ably-clientid'] assert client_id == base64.b64encode('client_id'.encode('ascii')).decode('utf-8') + ably.close() def test_request_token_auth_header(self): - ably = AblyRest(token='not_a_real_token') - ably.sync_enabled = True + ably = AblyRest(token='not_a_real_token', sync_enabled=True) with mock.patch.object(AsyncClient, 'send') as get_mock: try: ably.http.get('/time', skip_auth=False) @@ -116,7 +114,7 @@ def test_request_token_auth_header(self): request = get_mock.call_args_list[0][0][0] authorization = request.headers['Authorization'] assert authorization == 'Bearer %s' % base64.b64encode('not_a_real_token'.encode('ascii')).decode('utf-8') - + ably.close() def test_if_cant_authenticate_via_token(self): with pytest.raises(ValueError): AblyRest(use_token_auth=True) diff --git a/test/ably/sync/sync_resthttp_test.py b/test/ably/sync/sync_resthttp_test.py index 488ad069..b4caab2c 100644 --- a/test/ably/sync/sync_resthttp_test.py +++ b/test/ably/sync/sync_resthttp_test.py @@ -19,8 +19,7 @@ class TestRestHttp(BaseAsyncTestCase): def test_max_retry_attempts_and_timeouts_defaults(self): - ably = AblyRest(token="foo") - ably.sync_enabled = True + ably = AblyRest(token="foo", sync_enabled=True) assert 'http_open_timeout' in ably.http.CONNECTION_RETRY_DEFAULTS assert 'http_request_timeout' in ably.http.CONNECTION_RETRY_DEFAULTS @@ -34,8 +33,7 @@ def test_max_retry_attempts_and_timeouts_defaults(self): ably.close() def test_cumulative_timeout(self): - ably = AblyRest(token="foo") - ably.sync_enabled = True + ably = AblyRest(token="foo", sync_enabled=True) assert 'http_max_retry_duration' in ably.http.CONNECTION_RETRY_DEFAULTS @@ -53,8 +51,7 @@ def sleep_and_raise(*args, **kwargs): ably.close() def test_host_fallback(self): - ably = AblyRest(token="foo") - ably.sync_enabled = True + ably = AblyRest(token="foo", sync_enabled=True) def make_url(host): base_url = "%s://%s:%d" % (ably.http.preferred_scheme, @@ -86,8 +83,7 @@ def make_url(host): @respx.mock def test_no_host_fallback_nor_retries_if_custom_host(self): custom_host = 'example.org' - ably = AblyRest(token="foo", rest_host=custom_host) - ably.sync_enabled = True + ably = AblyRest(token="foo", rest_host=custom_host, sync_enabled=True) mock_route = respx.get("https://example.org").mock(side_effect=httpx.RequestError('')) @@ -137,8 +133,7 @@ async def side_effect(*args, **kwargs): @respx.mock def test_no_retry_if_not_500_to_599_http_code(self): default_host = Options().get_rest_host() - ably = AblyRest(token="foo") - ably.sync_enabled = True + ably = AblyRest(token="foo", sync_enabled=True) default_url = "%s://%s:%d/" % ( ably.http.preferred_scheme, @@ -163,8 +158,7 @@ def test_500_errors(self): https://github.com/ably/ably-python/issues/160 """ - ably = AblyRest(token="foo") - ably.sync_enabled = True + ably = AblyRest(token="foo", sync_enabled=True) def raise_ably_exception(*args, **kwargs): raise AblyException(message="", status_code=500, code=50000) @@ -181,13 +175,13 @@ def raise_ably_exception(*args, **kwargs): def test_custom_http_timeouts(self): ably = AblyRest( token="foo", http_request_timeout=30, http_open_timeout=8, - http_max_retry_count=6, http_max_retry_duration=20) - ably.sync_enabled = True + http_max_retry_count=6, http_max_retry_duration=20, sync_enabled=True) assert ably.http.http_request_timeout == 30 assert ably.http.http_open_timeout == 8 assert ably.http.http_max_retry_count == 6 assert ably.http.http_max_retry_duration == 20 + ably.close() # RSC7a, RSC7b def test_request_headers(self): diff --git a/test/ably/sync/sync_restrequest_test.py b/test/ably/sync/sync_restrequest_test.py index bcb988eb..6e0bd196 100644 --- a/test/ably/sync/sync_restrequest_test.py +++ b/test/ably/sync/sync_restrequest_test.py @@ -95,8 +95,7 @@ def test_headers(self): def test_timeout(self): # Timeout timeout = 0.000001 - ably = AblyRest(token="foo", http_request_timeout=timeout) - ably.sync_enabled = True + ably = AblyRest(token="foo", http_request_timeout=timeout, sync_enabled=True) assert ably.http.http_request_timeout == timeout with pytest.raises(httpx.ReadTimeout): ably.request('GET', '/time', version=Defaults.protocol_version) @@ -122,8 +121,8 @@ def test_timeout(self): rest_host='some.other.host', port=self.test_vars["port"], tls_port=self.test_vars["tls_port"], - tls=self.test_vars["tls"]) - ably.sync_enabled = True + tls=self.test_vars["tls"], + sync_enabled=True) with pytest.raises(httpx.ConnectError): ably.request('GET', '/time', version=Defaults.protocol_version) ably.close() diff --git a/test/ably/testapp.py b/test/ably/testapp.py index 7c8fead7..9560c742 100644 --- a/test/ably/testapp.py +++ b/test/ably/testapp.py @@ -125,9 +125,10 @@ async def get_test_vars(): @staticmethod @force_sync async def get_ably_rest(**kw): - rest = await TestApp.get_ably_rest(**kw) - rest.sync_enabled = True - return rest + test_vars = await TestApp.get_test_vars() + options = TestApp.get_options(test_vars, **kw) + options.update(sync_enabled=True, **kw) + return AblyRest(**options) @staticmethod @force_sync