From 80242e2762f90c69d94df3aa2a7a2a8db90c810e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 19:06:55 +0530 Subject: [PATCH 01/22] Added sync decorator to be used on async methods --- ably/decorator/__init__.py | 0 ably/decorator/sync.py | 44 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 ably/decorator/__init__.py create mode 100644 ably/decorator/sync.py diff --git a/ably/decorator/__init__.py b/ably/decorator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py new file mode 100644 index 00000000..11c4f06f --- /dev/null +++ b/ably/decorator/sync.py @@ -0,0 +1,44 @@ +import asyncio +import functools +import threading + +_loop = None +_thread = None + + +def get_custom_event_loop(): + global _loop, _thread + if _thread is None: + if _loop is None: + _loop = asyncio.new_event_loop() + if not _loop.is_running(): + _thread = threading.Thread( + target=_loop.run_forever, + daemon=True) + _thread.start() + return _loop + + +def enable_optional_sync(fn): + ''' + turn an async function to sync function + ''' + import asyncio + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + existing_loop = None + try: + existing_loop = asyncio.get_running_loop() + except: + pass + loop = get_custom_event_loop() + res = fn(*args, **kwargs) + if asyncio.iscoroutine(res): + future = asyncio.run_coroutine_threadsafe(res, loop) + if existing_loop is not None and existing_loop.is_running(): + return asyncio.wrap_future(future) + return future.result() + return res + + return wrapper From 54bc3d72aeb77c9610725a0f0056edc1191a79ef Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 19:16:22 +0530 Subject: [PATCH 02/22] Added documentation for optional sync --- ably/decorator/sync.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 11c4f06f..2db3e054 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -19,24 +19,25 @@ def get_custom_event_loop(): return _loop -def enable_optional_sync(fn): +def optional_sync(fn): ''' - turn an async function to sync function + Enables async function to be used as both sync and async function. + Also makes async/sync workflow thread safe. ''' import asyncio @functools.wraps(fn) def wrapper(*args, **kwargs): - existing_loop = None + caller_eventloop = None try: - existing_loop = asyncio.get_running_loop() + caller_eventloop = asyncio.get_running_loop() except: pass - loop = get_custom_event_loop() + ably_eventloop = get_custom_event_loop() res = fn(*args, **kwargs) if asyncio.iscoroutine(res): - future = asyncio.run_coroutine_threadsafe(res, loop) - if existing_loop is not None and existing_loop.is_running(): + future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) + if caller_eventloop is not None and caller_eventloop.is_running(): return asyncio.wrap_future(future) return future.result() return res From 0a7184579128c8abfb2bc1a2b55a7ff96ff287b0 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 20:14:17 +0530 Subject: [PATCH 03/22] Added gitignore file to ignore idea specific files --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 71554b60..0d07b9f2 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,5 @@ app_spec app_spec.pkl ably/types/options.py.orig test/ably/restsetup.py.orig + +.idea/**/* \ No newline at end of file From c6d99a4f6113e05616e06310633141435793e801 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 20:40:19 +0530 Subject: [PATCH 04/22] Removed unnecessary coroutine check for decorater sync --- .gitignore | 2 +- ably/decorator/sync.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 0d07b9f2..e7c864f7 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,4 @@ app_spec.pkl ably/types/options.py.orig test/ably/restsetup.py.orig -.idea/**/* \ No newline at end of file +.idea/**/* diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 2db3e054..3d156e44 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -23,6 +23,7 @@ def optional_sync(fn): ''' Enables async function to be used as both sync and async function. Also makes async/sync workflow thread safe. + This decorator should only be used on async methods/coroutines. ''' import asyncio @@ -34,12 +35,9 @@ def wrapper(*args, **kwargs): except: pass ably_eventloop = get_custom_event_loop() - res = fn(*args, **kwargs) - if asyncio.iscoroutine(res): - future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) - if caller_eventloop is not None and caller_eventloop.is_running(): - return asyncio.wrap_future(future) - return future.result() - return res + future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), ably_eventloop) + if caller_eventloop is not None and caller_eventloop.is_running(): + return asyncio.wrap_future(future) + return future.result() return wrapper From 33c940b54c8a603509e98442ecdd05e1248ab094 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 21:16:56 +0530 Subject: [PATCH 05/22] Added documentation to the sync decorator method --- ably/decorator/sync.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 3d156e44..b1334f25 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -1,12 +1,13 @@ import asyncio import functools import threading +from asyncio import events -_loop = None -_thread = None +_loop: events = None +_thread: threading = None -def get_custom_event_loop(): +def get_custom_event_loop() -> events: global _loop, _thread if _thread is None: if _loop is None: @@ -31,13 +32,21 @@ def optional_sync(fn): def wrapper(*args, **kwargs): caller_eventloop = None try: - caller_eventloop = asyncio.get_running_loop() + caller_eventloop: events = asyncio.get_running_loop() except: pass - ably_eventloop = get_custom_event_loop() + ably_eventloop: events = get_custom_event_loop() + + # Handle calls from ably_eventloop on the same loop, return awaitable + if caller_eventloop is not None and caller_eventloop == ably_eventloop: + return ably_eventloop.create_task(fn(*args, **kwargs)) + + # Post external calls on ably_eventloop, return awaitable on calling eventloop future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), ably_eventloop) if caller_eventloop is not None and caller_eventloop.is_running(): return asyncio.wrap_future(future) + + # If called from regular function instead of coroutine, block till result is available return future.result() return wrapper From 8dda5bd42d672168bef60b52faa5a4995e47c327 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 21:38:15 +0530 Subject: [PATCH 06/22] extracted event loop in a separate class --- ably/decorator/sync.py | 20 ++------------------ ably/executer/__init__.py | 0 ably/executer/eventloop.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 18 deletions(-) create mode 100644 ably/executer/__init__.py create mode 100644 ably/executer/eventloop.py diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index b1334f25..95e7cff1 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -1,23 +1,7 @@ -import asyncio import functools -import threading from asyncio import events -_loop: events = None -_thread: threading = None - - -def get_custom_event_loop() -> events: - global _loop, _thread - if _thread is None: - if _loop is None: - _loop = asyncio.new_event_loop() - if not _loop.is_running(): - _thread = threading.Thread( - target=_loop.run_forever, - daemon=True) - _thread.start() - return _loop +from ably.executer.eventloop import AblyEventLoop def optional_sync(fn): @@ -35,7 +19,7 @@ def wrapper(*args, **kwargs): caller_eventloop: events = asyncio.get_running_loop() except: pass - ably_eventloop: events = get_custom_event_loop() + ably_eventloop: events = AblyEventLoop.get_global().loop # Handle calls from ably_eventloop on the same loop, return awaitable if caller_eventloop is not None and caller_eventloop == ably_eventloop: diff --git a/ably/executer/__init__.py b/ably/executer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py new file mode 100644 index 00000000..6ca4ebdc --- /dev/null +++ b/ably/executer/eventloop.py @@ -0,0 +1,34 @@ +import asyncio +import threading +from asyncio import events +from threading import Thread + + +class AblyEventLoop: + loop: events + thread: Thread + + __global_event_loop: 'AblyEventLoop' = None + + @staticmethod + def get_global() -> 'AblyEventLoop': + if AblyEventLoop.__global_event_loop is None: + AblyEventLoop.__global_event_loop = AblyEventLoop() + AblyEventLoop.__global_event_loop._create_if_not_exist() + return AblyEventLoop.__global_event_loop + + def _create_if_not_exist(self): + if self.loop is None: + self.loop = asyncio.new_event_loop() + if not self.loop.is_running(): + self.thread = threading.Thread( + target=self.loop.run_forever, + daemon=True) + self.thread.start() + + def close(self) -> events: + self.loop.stop() + self.loop.close() + self.loop = None + self.thread = None + From c9f052ece60c99b77a68fcc8ca17f51f88c5f97f Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 22:21:50 +0530 Subject: [PATCH 07/22] Refactored sync decoratoer, extracted eventloop into executer --- ably/decorator/sync.py | 13 +++++++------ ably/executer/eventloop.py | 12 ++++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 95e7cff1..a73a6ea8 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -26,11 +26,12 @@ def wrapper(*args, **kwargs): return ably_eventloop.create_task(fn(*args, **kwargs)) # Post external calls on ably_eventloop, return awaitable on calling eventloop - future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), ably_eventloop) - if caller_eventloop is not None and caller_eventloop.is_running(): - return asyncio.wrap_future(future) - - # If called from regular function instead of coroutine, block till result is available - return future.result() + res = fn(*args, **kwargs) + if asyncio.iscoroutine(res): + future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) + if caller_eventloop is not None and caller_eventloop.is_running(): + return asyncio.wrap_future(future) + return future.result() + return res return wrapper diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index 6ca4ebdc..f5da2164 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -5,19 +5,20 @@ class AblyEventLoop: - loop: events - thread: Thread - __global_event_loop: 'AblyEventLoop' = None + def __init__(self): + self.loop = None + self.thread = None + @staticmethod def get_global() -> 'AblyEventLoop': if AblyEventLoop.__global_event_loop is None: AblyEventLoop.__global_event_loop = AblyEventLoop() - AblyEventLoop.__global_event_loop._create_if_not_exist() + AblyEventLoop.__global_event_loop.__create_if_not_exist() return AblyEventLoop.__global_event_loop - def _create_if_not_exist(self): + def __create_if_not_exist(self): if self.loop is None: self.loop = asyncio.new_event_loop() if not self.loop.is_running(): @@ -31,4 +32,3 @@ def close(self) -> events: self.loop.close() self.loop = None self.thread = None - From 0cbc31ab967afb61b298d7dd97231a81ed2b7c89 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 22:23:33 +0530 Subject: [PATCH 08/22] Annotated rest api with optional sync decorators --- ably/http/paginatedresult.py | 5 +++++ ably/rest/auth.py | 6 ++++++ ably/rest/channel.py | 7 +++++++ ably/rest/rest.py | 15 +++++++++++++++ 4 files changed, 33 insertions(+) diff --git a/ably/http/paginatedresult.py b/ably/http/paginatedresult.py index 6421251b..39b10cd6 100644 --- a/ably/http/paginatedresult.py +++ b/ably/http/paginatedresult.py @@ -2,6 +2,7 @@ import logging from urllib.parse import urlencode +from ably.decorator.sync import optional_sync from ably.http.http import Request from ably.util import case @@ -65,9 +66,11 @@ def has_next(self): def is_last(self): return not self.has_next() + @optional_sync async def first(self): return await self.__get_rel(self.__rel_first) if self.__rel_first else None + @optional_sync async def next(self): return await self.__get_rel(self.__rel_next) if self.__rel_next else None @@ -77,6 +80,7 @@ async def __get_rel(self, rel_req): return await self.paginated_query_with_request(self.__http, rel_req, self.__response_processor) @classmethod + @optional_sync async def paginated_query(cls, http, method='GET', url='/', version=None, body=None, headers=None, response_processor=None, raise_on_error=True): @@ -86,6 +90,7 @@ async def paginated_query(cls, http, method='GET', url='/', version=None, body=N return await cls.paginated_query_with_request(http, req, response_processor) @classmethod + @optional_sync async def paginated_query_with_request(cls, http, request, response_processor, raise_on_error=True): response = await http.make_request( diff --git a/ably/rest/auth.py b/ably/rest/auth.py index 06af2438..76686fe3 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -7,6 +7,7 @@ import uuid import httpx +from ably.decorator.sync import optional_sync from ably.types.options import Options if TYPE_CHECKING: from ably.rest.rest import AblyRest @@ -84,6 +85,7 @@ def __init__(self, ably: Union[AblyRest, AblyRealtime], options: Options): raise ValueError("Can't authenticate via token, must provide " "auth_callback, auth_url, key, token or a TokenDetail") + @optional_sync async def get_auth_transport_param(self): auth_credentials = {} if self.auth_options.client_id: @@ -148,9 +150,11 @@ def token_details_has_expired(self): return expires < timestamp + token_details.TOKEN_EXPIRY_BUFFER + @optional_sync async def authorize(self, token_params: Optional[dict] = None, auth_options=None): return await self.__authorize_when_necessary(token_params, auth_options, force=True) + @optional_sync async def request_token(self, token_params: Optional[dict] = None, # auth_options key_name: Optional[str] = None, key_secret: Optional[str] = None, auth_callback=None, @@ -229,6 +233,7 @@ async def request_token(self, token_params: Optional[dict] = None, log.debug("Token: %s" % str(response_dict.get("token"))) return TokenDetails.from_dict(response_dict) + @optional_sync async def create_token_request(self, token_params: Optional[dict] = None, key_name: Optional[str] = None, key_secret: Optional[str] = None, query_time=None): token_params = token_params or {} @@ -386,6 +391,7 @@ def _timestamp(self): def _random_nonce(self): return uuid.uuid4().hex[:16] + @optional_sync async def token_request_from_auth_url(self, method: str, url: str, token_params, headers, auth_params): body = None diff --git a/ably/rest/channel.py b/ably/rest/channel.py index d7995607..71ec21fc 100644 --- a/ably/rest/channel.py +++ b/ably/rest/channel.py @@ -9,6 +9,7 @@ from methoddispatch import SingleDispatch, singledispatch import msgpack +from ably.decorator.sync import optional_sync from ably.http.paginatedresult import PaginatedResult, format_params from ably.types.channeldetails import ChannelDetails from ably.types.message import Message, make_message_response_handler @@ -29,6 +30,7 @@ def __init__(self, ably, name, options): self.__presence = Presence(self) @catch_all + @optional_sync async def history(self, direction=None, limit: int = None, start=None, end=None): """Returns the history for this channel""" params = format_params({}, direction=direction, start=start, end=end, limit=limit) @@ -81,10 +83,12 @@ def _publish(self, arg, *args, **kwargs): raise TypeError('Unexpected type %s' % type(arg)) @_publish.register(Message) + @optional_sync async def publish_message(self, message, params=None, timeout=None): return await self.publish_messages([message], params, timeout=timeout) @_publish.register(list) + @optional_sync async def publish_messages(self, messages, params=None, timeout=None): request_body = self.__publish_request_body(messages) if not self.ably.options.use_binary_protocol: @@ -99,10 +103,12 @@ async def publish_messages(self, messages, params=None, timeout=None): return await self.ably.http.post(path, body=request_body, timeout=timeout) @_publish.register(str) + @optional_sync async def publish_name_data(self, name, data, timeout=None): messages = [Message(name, data)] return await self.publish_messages(messages, timeout=timeout) + @optional_sync async def publish(self, *args, **kwargs): """Publishes a message on this channel. @@ -131,6 +137,7 @@ async def publish(self, *args, **kwargs): return await self._publish(*args, **kwargs) + @optional_sync async def status(self): """Retrieves current channel active status with no. of publishers, subscribers, presence_members etc""" diff --git a/ably/rest/rest.py b/ably/rest/rest.py index a42ba2fd..76bcda8a 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -2,6 +2,8 @@ from typing import Optional from urllib.parse import urlencode +from ably.decorator.sync import optional_sync +from ably.executer.eventloop import AblyEventLoop from ably.http.http import Http from ably.http.paginatedresult import PaginatedResult, HttpPaginatedResponse from ably.http.paginatedresult import format_params @@ -78,6 +80,9 @@ def __init__(self, key: Optional[str] = None, token: Optional[str] = None, async def __aenter__(self): return self + def __enter__(self): + return self + @catch_all async def stats(self, direction: Optional[str] = None, start=None, end=None, params: Optional[dict] = None, limit: Optional[int] = None, paginated=None, unit=None, timeout=None): @@ -88,6 +93,7 @@ async def stats(self, direction: Optional[str] = None, start=None, end=None, par self.http, url=url, response_processor=stats_response_processor) @catch_all + @optional_sync async def time(self, timeout: Optional[float] = None) -> float: """Returns the current server time in ms since the unix epoch""" r = await self.http.get('/time', skip_auth=True, timeout=timeout) @@ -119,6 +125,7 @@ def options(self): def push(self): return self.__push + @optional_sync async def request(self, method: str, path: str, version: str, params: Optional[dict] = None, body=None, headers=None): if version is None: @@ -144,5 +151,13 @@ def response_processor(response): async def __aexit__(self, *excinfo): await self.close() + def __exit__(self, *excinfo): + self.close_sync() + + @optional_sync async def close(self): await self.http.close() + + def close_sync(self): + self.close() + AblyEventLoop.get_global().close() From 1ccb72da24e05691e5792c89a77460ee1f1af828 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 22:26:56 +0530 Subject: [PATCH 09/22] Fixed linting issues --- ably/decorator/sync.py | 2 +- ably/executer/eventloop.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index a73a6ea8..1efd1047 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -17,7 +17,7 @@ def wrapper(*args, **kwargs): caller_eventloop = None try: caller_eventloop: events = asyncio.get_running_loop() - except: + except Exception: pass ably_eventloop: events = AblyEventLoop.get_global().loop diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index f5da2164..b1b56cc7 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -1,10 +1,11 @@ import asyncio import threading from asyncio import events -from threading import Thread class AblyEventLoop: + loop: events = None + thread: threading = None __global_event_loop: 'AblyEventLoop' = None def __init__(self): From 55a773fca45cd1530fc1ed7c7f3a27c20b08d341 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 23:00:23 +0530 Subject: [PATCH 10/22] closed eventloop thread properly as a part of close method --- ably/executer/eventloop.py | 11 ++++++----- ably/http/http.py | 2 ++ ably/rest/rest.py | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index b1b56cc7..b4c39c4b 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -14,13 +14,14 @@ def __init__(self): @staticmethod def get_global() -> 'AblyEventLoop': - if AblyEventLoop.__global_event_loop is None: + if (AblyEventLoop.__global_event_loop is None or + AblyEventLoop.__global_event_loop.loop.is_closed()): AblyEventLoop.__global_event_loop = AblyEventLoop() AblyEventLoop.__global_event_loop.__create_if_not_exist() return AblyEventLoop.__global_event_loop def __create_if_not_exist(self): - if self.loop is None: + if self.loop is None or self.loop.is_closed(): self.loop = asyncio.new_event_loop() if not self.loop.is_running(): self.thread = threading.Thread( @@ -29,7 +30,7 @@ def __create_if_not_exist(self): self.thread.start() def close(self) -> events: - self.loop.stop() + # https://stackoverflow.com/questions/46093238/python-asyncio-event-loop-does-not-seem-to-stop-when-stop-method-is-called + self.loop.call_soon_threadsafe(self.loop.stop) + self.thread.join() self.loop.close() - self.loop = None - self.thread = None diff --git a/ably/http/http.py b/ably/http/http.py index e47ffb8f..2c4b4e48 100644 --- a/ably/http/http.py +++ b/ably/http/http.py @@ -7,6 +7,7 @@ import httpx import msgpack +from ably.decorator.sync import optional_sync from ably.rest.auth import Auth from ably.http.httputils import HttpUtils from ably.transport.defaults import Defaults @@ -131,6 +132,7 @@ def __init__(self, ably, options): self.__host_expires = None self.__client = httpx.AsyncClient(http2=True) + @optional_sync async def close(self): await self.__client.aclose() diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 76bcda8a..30e6eb15 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -154,10 +154,10 @@ async def __aexit__(self, *excinfo): def __exit__(self, *excinfo): self.close_sync() - @optional_sync async def close(self): await self.http.close() + AblyEventLoop.get_global().close() def close_sync(self): - self.close() + self.http.close() AblyEventLoop.get_global().close() From ba49e49cd8287c3e7319d7cad3960da9bb581e1b Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Sep 2023 23:48:26 +0530 Subject: [PATCH 11/22] closing eventloop only if it's not closed --- ably/executer/eventloop.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index b4c39c4b..6b433932 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -30,7 +30,8 @@ def __create_if_not_exist(self): self.thread.start() def close(self) -> events: - # https://stackoverflow.com/questions/46093238/python-asyncio-event-loop-does-not-seem-to-stop-when-stop-method-is-called - self.loop.call_soon_threadsafe(self.loop.stop) - self.thread.join() - self.loop.close() + if self.loop is not None and not self.loop.is_closed: + # https://stackoverflow.com/questions/46093238/python-asyncio-event-loop-does-not-seem-to-stop-when-stop-method-is-called + self.loop.call_soon_threadsafe(self.loop.stop) + self.thread.join() + self.loop.close() From fa4a8e8d6fc3d6ddd0e9332085ec32150bc00335 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 01:11:10 +0530 Subject: [PATCH 12/22] Refactored sync decorator, fixed failing tests --- ably/decorator/sync.py | 10 +++++----- ably/http/http.py | 1 + ably/rest/auth.py | 6 +++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 1efd1047..200c8575 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -21,13 +21,13 @@ def wrapper(*args, **kwargs): pass ably_eventloop: events = AblyEventLoop.get_global().loop - # Handle calls from ably_eventloop on the same loop, return awaitable - if caller_eventloop is not None and caller_eventloop == ably_eventloop: - return ably_eventloop.create_task(fn(*args, **kwargs)) - - # Post external calls on ably_eventloop, return awaitable on calling eventloop res = fn(*args, **kwargs) if asyncio.iscoroutine(res): + # Handle calls from ably_eventloop on the same loop, return awaitable + if caller_eventloop is not None and caller_eventloop == ably_eventloop: + return ably_eventloop.create_task(res) + + # Post external calls on ably_eventloop, return awaitable on calling eventloop future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) if caller_eventloop is not None and caller_eventloop.is_running(): return asyncio.wrap_future(future) diff --git a/ably/http/http.py b/ably/http/http.py index 2c4b4e48..7d550dd5 100644 --- a/ably/http/http.py +++ b/ably/http/http.py @@ -158,6 +158,7 @@ def get_rest_hosts(self): hosts.insert(0, host) return hosts + @optional_sync @reauth_if_expired async def make_request(self, method, path, version=None, headers=None, body=None, skip_auth=False, timeout=None, raise_on_error=True): diff --git a/ably/rest/auth.py b/ably/rest/auth.py index 76686fe3..78fde400 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -1,4 +1,6 @@ from __future__ import annotations + +import asyncio import base64 from datetime import timedelta import logging @@ -103,7 +105,9 @@ async def __authorize_when_necessary(self, token_params=None, auth_options=None, token_details = await self._ensure_valid_auth_credentials(token_params, auth_options, force) if self.ably._is_realtime: - await self.ably.connection.connection_manager.on_auth_updated(token_details) + realtime_loop = self.ably.connection.connection_manager.options.loop + future = asyncio.run_coroutine_threadsafe(self.ably.connection.connection_manager.on_auth_updated(token_details), realtime_loop) + await asyncio.wrap_future(future) return token_details From e41c473df57d4faa63343d4c388d90fd22ffe7df Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 01:20:49 +0530 Subject: [PATCH 13/22] refactored app event loop --- ably/decorator/sync.py | 2 +- ably/executer/eventloop.py | 14 +++++++------- ably/rest/rest.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 200c8575..0158f56e 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -19,7 +19,7 @@ def wrapper(*args, **kwargs): caller_eventloop: events = asyncio.get_running_loop() except Exception: pass - ably_eventloop: events = AblyEventLoop.get_global().loop + ably_eventloop: events = AblyEventLoop.current().loop res = fn(*args, **kwargs) if asyncio.iscoroutine(res): diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index 6b433932..037c6902 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -6,19 +6,19 @@ class AblyEventLoop: loop: events = None thread: threading = None - __global_event_loop: 'AblyEventLoop' = None + app: 'AblyEventLoop' = None def __init__(self): self.loop = None self.thread = None @staticmethod - def get_global() -> 'AblyEventLoop': - if (AblyEventLoop.__global_event_loop is None or - AblyEventLoop.__global_event_loop.loop.is_closed()): - AblyEventLoop.__global_event_loop = AblyEventLoop() - AblyEventLoop.__global_event_loop.__create_if_not_exist() - return AblyEventLoop.__global_event_loop + def current() -> 'AblyEventLoop': + if (AblyEventLoop.app is None or + AblyEventLoop.app.loop.is_closed()): + AblyEventLoop.app = AblyEventLoop() + AblyEventLoop.app.__create_if_not_exist() + return AblyEventLoop.app def __create_if_not_exist(self): if self.loop is None or self.loop.is_closed(): diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 30e6eb15..1f299c7b 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -156,8 +156,8 @@ def __exit__(self, *excinfo): async def close(self): await self.http.close() - AblyEventLoop.get_global().close() + AblyEventLoop.current().close() def close_sync(self): self.http.close() - AblyEventLoop.get_global().close() + AblyEventLoop.current().close() From 7f87a5139e422855fe5e033041af64f97de3c273 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 01:22:42 +0530 Subject: [PATCH 14/22] reformatted auth file --- ably/rest/auth.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ably/rest/auth.py b/ably/rest/auth.py index 78fde400..e753d521 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -11,6 +11,7 @@ from ably.decorator.sync import optional_sync from ably.types.options import Options + if TYPE_CHECKING: from ably.rest.rest import AblyRest from ably.realtime.realtime import AblyRealtime @@ -26,7 +27,6 @@ class Auth: - class Method: BASIC = "BASIC" TOKEN = "TOKEN" @@ -106,7 +106,8 @@ async def __authorize_when_necessary(self, token_params=None, auth_options=None, if self.ably._is_realtime: realtime_loop = self.ably.connection.connection_manager.options.loop - future = asyncio.run_coroutine_threadsafe(self.ably.connection.connection_manager.on_auth_updated(token_details), realtime_loop) + future = asyncio.run_coroutine_threadsafe( + self.ably.connection.connection_manager.on_auth_updated(token_details), realtime_loop) await asyncio.wrap_future(future) return token_details @@ -430,6 +431,6 @@ async def token_request_from_auth_url(self, method: str, url: str, token_params, token_request = response.text else: msg = 'auth_url responded with unacceptable content-type ' + content_type + \ - ', should be either text/plain, application/jwt or application/json', + ', should be either text/plain, application/jwt or application/json', raise AblyAuthException(msg, 401, 40170) return token_request From 51f1d819566e35970d048aec22ebe36bf7d3979e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 02:10:36 +0530 Subject: [PATCH 15/22] simplified sync decorator --- ably/decorator/sync.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 0158f56e..ecaca933 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -21,17 +21,16 @@ def wrapper(*args, **kwargs): pass ably_eventloop: events = AblyEventLoop.current().loop - res = fn(*args, **kwargs) - if asyncio.iscoroutine(res): - # Handle calls from ably_eventloop on the same loop, return awaitable - if caller_eventloop is not None and caller_eventloop == ably_eventloop: - return ably_eventloop.create_task(res) - - # Post external calls on ably_eventloop, return awaitable on calling eventloop - future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) - if caller_eventloop is not None and caller_eventloop.is_running(): - return asyncio.wrap_future(future) - return future.result() - return res - - return wrapper + # Handle calls from ably_eventloop on the same loop, return awaitable + if caller_eventloop is not None and caller_eventloop == ably_eventloop: + return ably_eventloop.create_task(fn(*args, **kwargs)) + + # Handle calls from external eventloop, post them on ably_eventloop + future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), ably_eventloop) + if caller_eventloop is not None and caller_eventloop.is_running(): + return asyncio.wrap_future(future) + + # If called from regular function, return blocking result + return future.result() + + return wrapper \ No newline at end of file From 0172888bda42d274b4150371d1bbdcef13d6e5d1 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 02:14:08 +0530 Subject: [PATCH 16/22] Revert "simplified sync decorator" This reverts commit 51f1d819566e35970d048aec22ebe36bf7d3979e. --- ably/decorator/sync.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index ecaca933..543914b2 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -21,16 +21,19 @@ def wrapper(*args, **kwargs): pass ably_eventloop: events = AblyEventLoop.current().loop - # Handle calls from ably_eventloop on the same loop, return awaitable - if caller_eventloop is not None and caller_eventloop == ably_eventloop: - return ably_eventloop.create_task(fn(*args, **kwargs)) - - # Handle calls from external eventloop, post them on ably_eventloop - future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), ably_eventloop) - if caller_eventloop is not None and caller_eventloop.is_running(): - return asyncio.wrap_future(future) - - # If called from regular function, return blocking result - return future.result() - - return wrapper \ No newline at end of file + res = fn(*args, **kwargs) + if asyncio.iscoroutine(res): + # Handle calls from ably_eventloop on the same loop, return awaitable + if caller_eventloop is not None and caller_eventloop == ably_eventloop: + return ably_eventloop.create_task(res) + + # Handle calls from external eventloop, post them on ably_eventloop + future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) + if caller_eventloop is not None and caller_eventloop.is_running(): + return asyncio.wrap_future(future) + + # If called from regular function, return blocking result + return future.result() + return res + + return wrapper From 1af4839f9a574ee684984a7345a0ac296f935e8e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 02:25:39 +0530 Subject: [PATCH 17/22] Refactored AblyEventLoop to AppEventLoop --- ably/decorator/sync.py | 4 ++-- ably/executer/eventloop.py | 17 ++++++++--------- ably/rest/rest.py | 6 +++--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 543914b2..16095585 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -1,7 +1,7 @@ import functools from asyncio import events -from ably.executer.eventloop import AblyEventLoop +from ably.executer.eventloop import AppEventLoop def optional_sync(fn): @@ -19,7 +19,7 @@ def wrapper(*args, **kwargs): caller_eventloop: events = asyncio.get_running_loop() except Exception: pass - ably_eventloop: events = AblyEventLoop.current().loop + ably_eventloop: events = AppEventLoop.current().loop res = fn(*args, **kwargs) if asyncio.iscoroutine(res): diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index 037c6902..b433e0f3 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -3,22 +3,22 @@ from asyncio import events -class AblyEventLoop: +class AppEventLoop: loop: events = None thread: threading = None - app: 'AblyEventLoop' = None + active: 'AppEventLoop' = None def __init__(self): self.loop = None self.thread = None @staticmethod - def current() -> 'AblyEventLoop': - if (AblyEventLoop.app is None or - AblyEventLoop.app.loop.is_closed()): - AblyEventLoop.app = AblyEventLoop() - AblyEventLoop.app.__create_if_not_exist() - return AblyEventLoop.app + def current() -> 'AppEventLoop': + if (AppEventLoop.active is None or + AppEventLoop.active.loop.is_closed()): + AppEventLoop.active = AppEventLoop() + AppEventLoop.active.__create_if_not_exist() + return AppEventLoop.active def __create_if_not_exist(self): if self.loop is None or self.loop.is_closed(): @@ -31,7 +31,6 @@ def __create_if_not_exist(self): def close(self) -> events: if self.loop is not None and not self.loop.is_closed: - # https://stackoverflow.com/questions/46093238/python-asyncio-event-loop-does-not-seem-to-stop-when-stop-method-is-called self.loop.call_soon_threadsafe(self.loop.stop) self.thread.join() self.loop.close() diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 1f299c7b..3c8cb984 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -3,7 +3,7 @@ from urllib.parse import urlencode from ably.decorator.sync import optional_sync -from ably.executer.eventloop import AblyEventLoop +from ably.executer.eventloop import AppEventLoop from ably.http.http import Http from ably.http.paginatedresult import PaginatedResult, HttpPaginatedResponse from ably.http.paginatedresult import format_params @@ -156,8 +156,8 @@ def __exit__(self, *excinfo): async def close(self): await self.http.close() - AblyEventLoop.current().close() + AppEventLoop.current().close() def close_sync(self): self.http.close() - AblyEventLoop.current().close() + AppEventLoop.current().close() From 7edff52b1af9f80da4bc5b72b3cb4df61301d7cd Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 03:04:27 +0530 Subject: [PATCH 18/22] Added eventloop close decorator --- ably/decorator/sync.py | 25 +++++++++++++++++++++++++ ably/rest/rest.py | 11 +++-------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index 16095585..f3577072 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -37,3 +37,28 @@ def wrapper(*args, **kwargs): return res return wrapper + + +def close_app_eventloop(fn): + import asyncio + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + + caller_eventloop = None + try: + caller_eventloop: events = asyncio.get_running_loop() + except Exception: + pass + + app_eventloop: events = AppEventLoop.current() + if caller_eventloop is not None: + app_eventloop.close() + return caller_eventloop.create_task(fn(*args, **kwargs)) + else: + future = asyncio.run_coroutine_threadsafe(fn(*args, **kwargs), app_eventloop.loop) + result = future.result() + app_eventloop.close() + return result + + return wrapper diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 3c8cb984..8ceb67f2 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -2,8 +2,7 @@ from typing import Optional from urllib.parse import urlencode -from ably.decorator.sync import optional_sync -from ably.executer.eventloop import AppEventLoop +from ably.decorator.sync import optional_sync, close_app_eventloop from ably.http.http import Http from ably.http.paginatedresult import PaginatedResult, HttpPaginatedResponse from ably.http.paginatedresult import format_params @@ -152,12 +151,8 @@ async def __aexit__(self, *excinfo): await self.close() def __exit__(self, *excinfo): - self.close_sync() + self.close() + @close_app_eventloop async def close(self): await self.http.close() - AppEventLoop.current().close() - - def close_sync(self): - self.http.close() - AppEventLoop.current().close() From f4608cd3382f4f6517f932619841327e1112d627 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 03:45:55 +0530 Subject: [PATCH 19/22] Updated eventloop to stop instead of close --- ably/executer/eventloop.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index b433e0f3..b8f0b4f5 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -4,8 +4,8 @@ class AppEventLoop: - loop: events = None - thread: threading = None + loop: events + thread: threading active: 'AppEventLoop' = None def __init__(self): @@ -15,7 +15,7 @@ def __init__(self): @staticmethod def current() -> 'AppEventLoop': if (AppEventLoop.active is None or - AppEventLoop.active.loop.is_closed()): + not AppEventLoop.active.loop.is_running()): AppEventLoop.active = AppEventLoop() AppEventLoop.active.__create_if_not_exist() return AppEventLoop.active @@ -30,7 +30,6 @@ def __create_if_not_exist(self): self.thread.start() def close(self) -> events: - if self.loop is not None and not self.loop.is_closed: + if self.loop is not None and self.loop.is_running(): self.loop.call_soon_threadsafe(self.loop.stop) self.thread.join() - self.loop.close() From 60874360075a4dda0d1d90e40aa8b754cbcddf07 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 12:39:06 +0530 Subject: [PATCH 20/22] Refactored code to run async safe --- ably/decorator/sync.py | 11 ++++++++--- ably/http/http.py | 6 +++--- ably/http/paginatedresult.py | 10 +++++----- ably/rest/auth.py | 12 ++++++------ ably/rest/channel.py | 14 +++++++------- ably/rest/rest.py | 6 +++--- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/ably/decorator/sync.py b/ably/decorator/sync.py index f3577072..b26dcd44 100644 --- a/ably/decorator/sync.py +++ b/ably/decorator/sync.py @@ -4,11 +4,14 @@ from ably.executer.eventloop import AppEventLoop -def optional_sync(fn): +def run_safe(fn): ''' - Enables async function to be used as both sync and async function. + USAGE : + If called from an eventloop or coroutine, returns a future, doesn't block external eventloop. + If called from a regular function, returns a blocking result. Also makes async/sync workflow thread safe. This decorator should only be used on async methods/coroutines. + Completely safe to use for existing async users. ''' import asyncio @@ -27,8 +30,10 @@ def wrapper(*args, **kwargs): if caller_eventloop is not None and caller_eventloop == ably_eventloop: return ably_eventloop.create_task(res) - # Handle calls from external eventloop, post them on ably_eventloop future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) + + # Handle calls from external eventloop, post them on ably_eventloop + # Return future back to external_eventloop if caller_eventloop is not None and caller_eventloop.is_running(): return asyncio.wrap_future(future) diff --git a/ably/http/http.py b/ably/http/http.py index 7d550dd5..60c1b1d0 100644 --- a/ably/http/http.py +++ b/ably/http/http.py @@ -7,7 +7,7 @@ import httpx import msgpack -from ably.decorator.sync import optional_sync +from ably.decorator.sync import run_safe from ably.rest.auth import Auth from ably.http.httputils import HttpUtils from ably.transport.defaults import Defaults @@ -132,7 +132,7 @@ def __init__(self, ably, options): self.__host_expires = None self.__client = httpx.AsyncClient(http2=True) - @optional_sync + @run_safe async def close(self): await self.__client.aclose() @@ -158,7 +158,7 @@ def get_rest_hosts(self): hosts.insert(0, host) return hosts - @optional_sync + @run_safe @reauth_if_expired async def make_request(self, method, path, version=None, headers=None, body=None, skip_auth=False, timeout=None, raise_on_error=True): diff --git a/ably/http/paginatedresult.py b/ably/http/paginatedresult.py index 39b10cd6..70da9174 100644 --- a/ably/http/paginatedresult.py +++ b/ably/http/paginatedresult.py @@ -2,7 +2,7 @@ import logging from urllib.parse import urlencode -from ably.decorator.sync import optional_sync +from ably.decorator.sync import run_safe from ably.http.http import Request from ably.util import case @@ -66,11 +66,11 @@ def has_next(self): def is_last(self): return not self.has_next() - @optional_sync + @run_safe async def first(self): return await self.__get_rel(self.__rel_first) if self.__rel_first else None - @optional_sync + @run_safe async def next(self): return await self.__get_rel(self.__rel_next) if self.__rel_next else None @@ -80,7 +80,7 @@ async def __get_rel(self, rel_req): return await self.paginated_query_with_request(self.__http, rel_req, self.__response_processor) @classmethod - @optional_sync + @run_safe async def paginated_query(cls, http, method='GET', url='/', version=None, body=None, headers=None, response_processor=None, raise_on_error=True): @@ -90,7 +90,7 @@ async def paginated_query(cls, http, method='GET', url='/', version=None, body=N return await cls.paginated_query_with_request(http, req, response_processor) @classmethod - @optional_sync + @run_safe async def paginated_query_with_request(cls, http, request, response_processor, raise_on_error=True): response = await http.make_request( diff --git a/ably/rest/auth.py b/ably/rest/auth.py index e753d521..fa93b192 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -9,7 +9,7 @@ import uuid import httpx -from ably.decorator.sync import optional_sync +from ably.decorator.sync import run_safe from ably.types.options import Options if TYPE_CHECKING: @@ -87,7 +87,7 @@ def __init__(self, ably: Union[AblyRest, AblyRealtime], options: Options): raise ValueError("Can't authenticate via token, must provide " "auth_callback, auth_url, key, token or a TokenDetail") - @optional_sync + @run_safe async def get_auth_transport_param(self): auth_credentials = {} if self.auth_options.client_id: @@ -155,11 +155,11 @@ def token_details_has_expired(self): return expires < timestamp + token_details.TOKEN_EXPIRY_BUFFER - @optional_sync + @run_safe async def authorize(self, token_params: Optional[dict] = None, auth_options=None): return await self.__authorize_when_necessary(token_params, auth_options, force=True) - @optional_sync + @run_safe async def request_token(self, token_params: Optional[dict] = None, # auth_options key_name: Optional[str] = None, key_secret: Optional[str] = None, auth_callback=None, @@ -238,7 +238,7 @@ async def request_token(self, token_params: Optional[dict] = None, log.debug("Token: %s" % str(response_dict.get("token"))) return TokenDetails.from_dict(response_dict) - @optional_sync + @run_safe async def create_token_request(self, token_params: Optional[dict] = None, key_name: Optional[str] = None, key_secret: Optional[str] = None, query_time=None): token_params = token_params or {} @@ -396,7 +396,7 @@ def _timestamp(self): def _random_nonce(self): return uuid.uuid4().hex[:16] - @optional_sync + @run_safe async def token_request_from_auth_url(self, method: str, url: str, token_params, headers, auth_params): body = None diff --git a/ably/rest/channel.py b/ably/rest/channel.py index 71ec21fc..defaa9af 100644 --- a/ably/rest/channel.py +++ b/ably/rest/channel.py @@ -9,7 +9,7 @@ from methoddispatch import SingleDispatch, singledispatch import msgpack -from ably.decorator.sync import optional_sync +from ably.decorator.sync import run_safe from ably.http.paginatedresult import PaginatedResult, format_params from ably.types.channeldetails import ChannelDetails from ably.types.message import Message, make_message_response_handler @@ -30,7 +30,7 @@ def __init__(self, ably, name, options): self.__presence = Presence(self) @catch_all - @optional_sync + @run_safe async def history(self, direction=None, limit: int = None, start=None, end=None): """Returns the history for this channel""" params = format_params({}, direction=direction, start=start, end=end, limit=limit) @@ -83,12 +83,12 @@ def _publish(self, arg, *args, **kwargs): raise TypeError('Unexpected type %s' % type(arg)) @_publish.register(Message) - @optional_sync + @run_safe async def publish_message(self, message, params=None, timeout=None): return await self.publish_messages([message], params, timeout=timeout) @_publish.register(list) - @optional_sync + @run_safe async def publish_messages(self, messages, params=None, timeout=None): request_body = self.__publish_request_body(messages) if not self.ably.options.use_binary_protocol: @@ -103,12 +103,12 @@ async def publish_messages(self, messages, params=None, timeout=None): return await self.ably.http.post(path, body=request_body, timeout=timeout) @_publish.register(str) - @optional_sync + @run_safe async def publish_name_data(self, name, data, timeout=None): messages = [Message(name, data)] return await self.publish_messages(messages, timeout=timeout) - @optional_sync + @run_safe async def publish(self, *args, **kwargs): """Publishes a message on this channel. @@ -137,7 +137,7 @@ async def publish(self, *args, **kwargs): return await self._publish(*args, **kwargs) - @optional_sync + @run_safe async def status(self): """Retrieves current channel active status with no. of publishers, subscribers, presence_members etc""" diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 8ceb67f2..235c8c2d 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -2,7 +2,7 @@ from typing import Optional from urllib.parse import urlencode -from ably.decorator.sync import optional_sync, close_app_eventloop +from ably.decorator.sync import run_safe, close_app_eventloop from ably.http.http import Http from ably.http.paginatedresult import PaginatedResult, HttpPaginatedResponse from ably.http.paginatedresult import format_params @@ -92,7 +92,7 @@ async def stats(self, direction: Optional[str] = None, start=None, end=None, par self.http, url=url, response_processor=stats_response_processor) @catch_all - @optional_sync + @run_safe async def time(self, timeout: Optional[float] = None) -> float: """Returns the current server time in ms since the unix epoch""" r = await self.http.get('/time', skip_auth=True, timeout=timeout) @@ -124,7 +124,7 @@ def options(self): def push(self): return self.__push - @optional_sync + @run_safe async def request(self, method: str, path: str, version: str, params: Optional[dict] = None, body=None, headers=None): if version is None: From fb1b79d3b6c5426c28951a73c720e99dc545fd55 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 30 Sep 2023 12:53:57 +0530 Subject: [PATCH 21/22] Moved sync package under executer --- ably/decorator/__init__.py | 0 ably/{decorator/sync.py => executer/decorator.py} | 14 +++++++------- ably/http/http.py | 2 +- ably/http/paginatedresult.py | 2 +- ably/rest/auth.py | 2 +- ably/rest/channel.py | 2 +- ably/rest/rest.py | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) delete mode 100644 ably/decorator/__init__.py rename ably/{decorator/sync.py => executer/decorator.py} (83%) diff --git a/ably/decorator/__init__.py b/ably/decorator/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ably/decorator/sync.py b/ably/executer/decorator.py similarity index 83% rename from ably/decorator/sync.py rename to ably/executer/decorator.py index b26dcd44..ce400ad7 100644 --- a/ably/decorator/sync.py +++ b/ably/executer/decorator.py @@ -22,18 +22,18 @@ def wrapper(*args, **kwargs): caller_eventloop: events = asyncio.get_running_loop() except Exception: pass - ably_eventloop: events = AppEventLoop.current().loop + app_loop: events = AppEventLoop.current().loop res = fn(*args, **kwargs) if asyncio.iscoroutine(res): - # Handle calls from ably_eventloop on the same loop, return awaitable - if caller_eventloop is not None and caller_eventloop == ably_eventloop: - return ably_eventloop.create_task(res) + # Handle calls from app eventloop on the same loop, return awaitable + if caller_eventloop is not None and caller_eventloop == app_loop: + return app_loop.create_task(res) - future = asyncio.run_coroutine_threadsafe(res, ably_eventloop) + future = asyncio.run_coroutine_threadsafe(res, app_loop) - # Handle calls from external eventloop, post them on ably_eventloop - # Return future back to external_eventloop + # Handle calls from external eventloop, post them on app eventloop + # Return awaitable back to external_eventloop if caller_eventloop is not None and caller_eventloop.is_running(): return asyncio.wrap_future(future) diff --git a/ably/http/http.py b/ably/http/http.py index 60c1b1d0..08d164ce 100644 --- a/ably/http/http.py +++ b/ably/http/http.py @@ -7,7 +7,7 @@ import httpx import msgpack -from ably.decorator.sync import run_safe +from ably.executer.decorator import run_safe from ably.rest.auth import Auth from ably.http.httputils import HttpUtils from ably.transport.defaults import Defaults diff --git a/ably/http/paginatedresult.py b/ably/http/paginatedresult.py index 70da9174..8e784afe 100644 --- a/ably/http/paginatedresult.py +++ b/ably/http/paginatedresult.py @@ -2,7 +2,7 @@ import logging from urllib.parse import urlencode -from ably.decorator.sync import run_safe +from ably.executer.decorator import run_safe from ably.http.http import Request from ably.util import case diff --git a/ably/rest/auth.py b/ably/rest/auth.py index fa93b192..5a94deec 100644 --- a/ably/rest/auth.py +++ b/ably/rest/auth.py @@ -9,7 +9,7 @@ import uuid import httpx -from ably.decorator.sync import run_safe +from ably.executer.decorator import run_safe from ably.types.options import Options if TYPE_CHECKING: diff --git a/ably/rest/channel.py b/ably/rest/channel.py index defaa9af..1dc5e9c6 100644 --- a/ably/rest/channel.py +++ b/ably/rest/channel.py @@ -9,7 +9,7 @@ from methoddispatch import SingleDispatch, singledispatch import msgpack -from ably.decorator.sync import run_safe +from ably.executer.decorator import run_safe from ably.http.paginatedresult import PaginatedResult, format_params from ably.types.channeldetails import ChannelDetails from ably.types.message import Message, make_message_response_handler diff --git a/ably/rest/rest.py b/ably/rest/rest.py index 235c8c2d..907d616e 100644 --- a/ably/rest/rest.py +++ b/ably/rest/rest.py @@ -2,7 +2,7 @@ from typing import Optional from urllib.parse import urlencode -from ably.decorator.sync import run_safe, close_app_eventloop +from ably.executer.decorator import run_safe, close_app_eventloop from ably.http.http import Http from ably.http.paginatedresult import PaginatedResult, HttpPaginatedResponse from ably.http.paginatedresult import format_params From 437fc489c6e5851bfae26f8825a9cdeacc4cb23d Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sun, 1 Oct 2023 11:49:05 +0530 Subject: [PATCH 22/22] Added test for the sync api --- test/ably/sync/restauth_test.py | 652 ++++++++++++++++++++++++++++++++ test/ably/testapp.py | 25 +- test/ably/utils.py | 9 +- 3 files changed, 682 insertions(+), 4 deletions(-) create mode 100644 test/ably/sync/restauth_test.py diff --git a/test/ably/sync/restauth_test.py b/test/ably/sync/restauth_test.py new file mode 100644 index 00000000..60d67e71 --- /dev/null +++ b/test/ably/sync/restauth_test.py @@ -0,0 +1,652 @@ +import logging +import sys +import time +import uuid +import base64 + +from urllib.parse import parse_qs +import mock +import pytest +import respx +from httpx import Response, AsyncClient + +import ably +from ably import AblyRest +from ably import Auth +from ably import AblyAuthException +from ably.types.tokendetails import TokenDetails + +from test.ably.testapp import TestApp, TestAppSync +from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase + +if sys.version_info >= (3, 8): + from unittest.mock import AsyncMock +else: + from mock import AsyncMock + +log = logging.getLogger(__name__) + + +# does not make any request, no need to vary by protocol +class TestAuth(BaseAsyncTestCase): + def setUp(self): + self.test_vars = TestAppSync.get_test_vars() + + def test_auth_init_key_only(self): + ably = AblyRest(key=self.test_vars["keys"][0]["key_str"]) + assert Auth.Method.BASIC == ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + assert ably.auth.auth_options.key_name == self.test_vars["keys"][0]['key_name'] + assert ably.auth.auth_options.key_secret == self.test_vars["keys"][0]['key_secret'] + + def test_auth_init_token_only(self): + ably = AblyRest(token="this_is_not_really_a_token") + + assert Auth.Method.TOKEN == ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + + def test_auth_token_details(self): + td = TokenDetails() + ably = AblyRest(token_details=td) + + assert Auth.Method.TOKEN == ably.auth.auth_mechanism + assert ably.auth.token_details is td + + def test_auth_init_with_token_callback(self): + callback_called = [] + + async def token_callback(token_params): + callback_called.append(True) + return "this_is_not_really_a_token_request" + + ably = TestAppSync.get_ably_rest( + key=None, + key_name=self.test_vars["keys"][0]["key_name"], + auth_callback=token_callback) + + try: + ably.stats(None) + except Exception: + pass + + assert callback_called, "Token callback not called" + assert Auth.Method.TOKEN == ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + + def test_auth_init_with_key_and_client_id(self): + ably = AblyRest(key=self.test_vars["keys"][0]["key_str"], client_id='testClientId') + + assert Auth.Method.BASIC == ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + assert ably.auth.client_id == 'testClientId' + + def test_auth_init_with_token(self): + ably = TestAppSync.get_ably_rest(key=None, token="this_is_not_really_a_token") + assert Auth.Method.TOKEN == ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + + # RSA11 + def test_request_basic_auth_header(self): + ably = AblyRest(key_secret='foo', key_name='bar') + + with mock.patch.object(AsyncClient, 'send') as get_mock: + try: + result = ably.http.get('/time', skip_auth=False) + except Exception: + pass + request = get_mock.call_args_list[0][0][0] + authorization = request.headers['Authorization'] + assert authorization == 'Basic %s' % base64.b64encode('bar:foo'.encode('ascii')).decode('utf-8') + + # RSA7e2 + def test_request_basic_auth_header_with_client_id(self): + ably = AblyRest(key_secret='foo', key_name='bar', client_id='client_id') + + with mock.patch.object(AsyncClient, 'send') as get_mock: + try: + ably.http.get('/time', skip_auth=False) + except Exception: + pass + request = get_mock.call_args_list[0][0][0] + client_id = request.headers['x-ably-clientid'] + assert client_id == base64.b64encode('client_id'.encode('ascii')).decode('utf-8') + + def test_request_token_auth_header(self): + ably = AblyRest(token='not_a_real_token') + + with mock.patch.object(AsyncClient, 'send') as get_mock: + try: + ably.http.get('/time', skip_auth=False) + except Exception: + pass + request = get_mock.call_args_list[0][0][0] + authorization = request.headers['Authorization'] + assert authorization == 'Bearer %s' % base64.b64encode('not_a_real_token'.encode('ascii')).decode('utf-8') + + def test_if_cant_authenticate_via_token(self): + with pytest.raises(ValueError): + AblyRest(use_token_auth=True) + + def test_use_auth_token(self): + ably = AblyRest(use_token_auth=True, key=self.test_vars["keys"][0]["key_str"]) + assert ably.auth.auth_mechanism == Auth.Method.TOKEN + + def test_with_client_id(self): + ably = AblyRest(use_token_auth=True, client_id='client_id', key=self.test_vars["keys"][0]["key_str"]) + assert ably.auth.auth_mechanism == Auth.Method.TOKEN + + def test_with_auth_url(self): + ably = AblyRest(auth_url='auth_url') + assert ably.auth.auth_mechanism == Auth.Method.TOKEN + + def test_with_auth_callback(self): + ably = AblyRest(auth_callback=lambda x: x) + assert ably.auth.auth_mechanism == Auth.Method.TOKEN + + def test_with_token(self): + ably = AblyRest(token='a token') + assert ably.auth.auth_mechanism == Auth.Method.TOKEN + + def test_default_ttl_is_1hour(self): + one_hour_in_ms = 60 * 60 * 1000 + assert TokenDetails.DEFAULTS['ttl'] == one_hour_in_ms + + def test_with_auth_method(self): + ably = AblyRest(token='a token', auth_method='POST') + assert ably.auth.auth_options.auth_method == 'POST' + + def test_with_auth_headers(self): + ably = AblyRest(token='a token', auth_headers={'h1': 'v1'}) + assert ably.auth.auth_options.auth_headers == {'h1': 'v1'} + + def test_with_auth_params(self): + ably = AblyRest(token='a token', auth_params={'p': 'v'}) + assert ably.auth.auth_options.auth_params == {'p': 'v'} + + def test_with_default_token_params(self): + ably = AblyRest(key=self.test_vars["keys"][0]["key_str"], + default_token_params={'ttl': 12345}) + assert ably.auth.auth_options.default_token_params == {'ttl': 12345} + + +class TestAuthAuthorize(BaseAsyncTestCase, metaclass=VaryByProtocolTestsMetaclass): + + def setUp(self): + self.ably = TestAppSync.get_ably_rest() + self.test_vars = TestAppSync.get_test_vars() + + def tearDown(self): + self.ably.close() + + def per_protocol_setup(self, use_binary_protocol): + self.ably.options.use_binary_protocol = use_binary_protocol + self.use_binary_protocol = use_binary_protocol + + def test_if_authorize_changes_auth_mechanism_to_token(self): + assert Auth.Method.BASIC == self.ably.auth.auth_mechanism, "Unexpected Auth method mismatch" + + self.ably.auth.authorize() + + assert Auth.Method.TOKEN == self.ably.auth.auth_mechanism, "Authorize should change the Auth method" + + # RSA10a + @dont_vary_protocol + def test_authorize_always_creates_new_token(self): + self.ably.auth.authorize({'capability': {'test': ['publish']}}) + self.ably.channels.test.publish('event', 'data') + + self.ably.auth.authorize({'capability': {'test': ['subscribe']}}) + with pytest.raises(AblyAuthException): + self.ably.channels.test.publish('event', 'data') + + def test_authorize_create_new_token_if_expired(self): + token = self.ably.auth.authorize() + with mock.patch('ably.rest.auth.Auth.token_details_has_expired', + return_value=True): + new_token = self.ably.auth.authorize() + + assert token is not new_token + + def test_authorize_returns_a_token_details(self): + token = self.ably.auth.authorize() + assert isinstance(token, TokenDetails) + + @dont_vary_protocol + def test_authorize_adheres_to_request_token(self): + token_params = {'ttl': 10, 'client_id': 'client_id'} + auth_params = {'auth_url': 'somewhere.com', 'query_time': True} + with mock.patch('ably.rest.auth.Auth.request_token', new_callable=AsyncMock) as request_mock: + self.ably.auth.authorize(token_params, auth_params) + + token_called, auth_called = request_mock.call_args + assert token_called[0] == token_params + + # Authorize may call request_token with some default auth_options. + for arg, value in auth_params.items(): + assert auth_called[arg] == value, "%s called with wrong value: %s" % (arg, value) + + def test_with_token_str_https(self): + token = self.ably.auth.authorize() + token = token.token + ably = TestAppSync.get_ably_rest(key=None, token=token, tls=True, + use_binary_protocol=self.use_binary_protocol) + ably.channels.test_auth_with_token_str.publish('event', 'foo_bar') + ably.close() + + def test_with_token_str_http(self): + token = self.ably.auth.authorize() + token = token.token + ably = TestAppSync.get_ably_rest(key=None, token=token, tls=False, + use_binary_protocol=self.use_binary_protocol) + ably.channels.test_auth_with_token_str.publish('event', 'foo_bar') + ably.close() + + def test_if_default_client_id_is_used(self): + ably = TestAppSync.get_ably_rest(client_id='my_client_id', + use_binary_protocol=self.use_binary_protocol) + token = ably.auth.authorize() + assert token.client_id == 'my_client_id' + ably.close() + + # RSA10j + def test_if_parameters_are_stored_and_used_as_defaults(self): + # Define some parameters + auth_options = dict(self.ably.auth.auth_options.auth_options) + auth_options['auth_headers'] = {'a_headers': 'a_value'} + self.ably.auth.authorize({'ttl': 555}, auth_options) + with mock.patch('ably.rest.auth.Auth.request_token', + wraps=self.ably.auth.request_token) as request_mock: + self.ably.auth.authorize() + + token_called, auth_called = request_mock.call_args + assert token_called[0] == {'ttl': 555} + assert auth_called['auth_headers'] == {'a_headers': 'a_value'} + + # Different parameters, should completely replace the first ones, not merge + auth_options = dict(self.ably.auth.auth_options.auth_options) + auth_options['auth_headers'] = None + self.ably.auth.authorize({}, auth_options) + with mock.patch('ably.rest.auth.Auth.request_token', + wraps=self.ably.auth.request_token) as request_mock: + self.ably.auth.authorize() + + token_called, auth_called = request_mock.call_args + assert token_called[0] == {} + assert auth_called['auth_headers'] is None + + # RSA10g + def test_timestamp_is_not_stored(self): + # authorize once with arbitrary defaults + auth_options = dict(self.ably.auth.auth_options.auth_options) + auth_options['auth_headers'] = {'a_headers': 'a_value'} + token_1 = self.ably.auth.authorize( + {'ttl': 60 * 1000, 'client_id': 'new_id'}, + auth_options) + assert isinstance(token_1, TokenDetails) + + # call authorize again with timestamp set + timestamp = self.ably.time() + with mock.patch('ably.rest.auth.TokenRequest', + wraps=ably.types.tokenrequest.TokenRequest) as tr_mock: + auth_options = dict(self.ably.auth.auth_options.auth_options) + auth_options['auth_headers'] = {'a_headers': 'a_value'} + token_2 = self.ably.auth.authorize( + {'ttl': 60 * 1000, 'client_id': 'new_id', 'timestamp': timestamp}, + auth_options) + assert isinstance(token_2, TokenDetails) + assert token_1 != token_2 + assert tr_mock.call_args[1]['timestamp'] == timestamp + + # call authorize again with no params + with mock.patch('ably.rest.auth.TokenRequest', + wraps=ably.types.tokenrequest.TokenRequest) as tr_mock: + token_4 = self.ably.auth.authorize() + assert isinstance(token_4, TokenDetails) + assert token_2 != token_4 + assert tr_mock.call_args[1]['timestamp'] != timestamp + + def test_client_id_precedence(self): + client_id = uuid.uuid4().hex + overridden_client_id = uuid.uuid4().hex + ably = TestAppSync.get_ably_rest( + use_binary_protocol=self.use_binary_protocol, + client_id=client_id, + default_token_params={'client_id': overridden_client_id}) + token = ably.auth.authorize() + assert token.client_id == client_id + assert ably.auth.client_id == client_id + + channel = ably.channels[ + self.get_channel_name('test_client_id_precedence')] + channel.publish('test', 'data') + history = channel.history() + assert history.items[0].client_id == client_id + ably.close() + + +class TestRequestToken(BaseAsyncTestCase, metaclass=VaryByProtocolTestsMetaclass): + + def setUp(self): + self.test_vars = TestAppSync.get_test_vars() + + def per_protocol_setup(self, use_binary_protocol): + self.use_binary_protocol = use_binary_protocol + + def test_with_key(self): + ably = TestAppSync.get_ably_rest(use_binary_protocol=self.use_binary_protocol) + + token_details = ably.auth.request_token() + assert isinstance(token_details, TokenDetails) + ably.close() + + ably = TestAppSync.get_ably_rest(key=None, token_details=token_details, + use_binary_protocol=self.use_binary_protocol) + channel = self.get_channel_name('test_request_token_with_key') + + ably.channels[channel].publish('event', 'foo') + + history = ably.channels[channel].history() + assert history.items[0].data == 'foo' + ably.close() + + @dont_vary_protocol + @respx.mock + def test_with_auth_url_headers_and_params_POST(self): # noqa: N802 + url = 'http://www.example.com' + headers = {'foo': 'bar'} + ably = TestAppSync.get_ably_rest(key=None, auth_url=url) + + auth_params = {'foo': 'auth', 'spam': 'eggs'} + token_params = {'foo': 'token'} + auth_route = respx.post(url) + + def call_back(request): + assert request.headers['content-type'] == 'application/x-www-form-urlencoded' + assert headers['foo'] == request.headers['foo'] + + # TokenParams has precedence + assert parse_qs(request.content.decode('utf-8')) == {'foo': ['token'], 'spam': ['eggs']} + return Response( + status_code=200, + content="token_string", + headers={ + "Content-Type": "text/plain", + } + ) + + auth_route.side_effect = call_back + token_details = ably.auth.request_token( + token_params=token_params, auth_url=url, auth_headers=headers, + auth_method='POST', auth_params=auth_params) + + assert 1 == auth_route.called + assert isinstance(token_details, TokenDetails) + assert 'token_string' == token_details.token + ably.close() + + @dont_vary_protocol + @respx.mock + def test_with_auth_url_headers_and_params_GET(self): # noqa: N802 + url = 'http://www.example.com' + headers = {'foo': 'bar'} + ably = TestAppSync.get_ably_rest( + key=None, auth_url=url, + auth_headers={'this': 'will_not_be_used'}, + auth_params={'this': 'will_not_be_used'}) + + auth_params = {'foo': 'auth', 'spam': 'eggs'} + token_params = {'foo': 'token'} + auth_route = respx.get(url, params={'foo': ['token'], 'spam': ['eggs']}) + + def call_back(request): + assert request.headers['foo'] == 'bar' + assert 'this' not in request.headers + assert not request.content + + return Response( + status_code=200, + json={'issued': 1, 'token': 'another_token_string'} + ) + auth_route.side_effect = call_back + token_details = ably.auth.request_token( + token_params=token_params, auth_url=url, auth_headers=headers, + auth_params=auth_params) + assert 'another_token_string' == token_details.token + ably.close() + + @dont_vary_protocol + def test_with_callback(self): + called_token_params = {'ttl': '3600000'} + + def callback(token_params): + assert token_params == called_token_params + return 'token_string' + + ably = TestAppSync.get_ably_rest(key=None, auth_callback=callback) + + token_details = ably.auth.request_token( + token_params=called_token_params, auth_callback=callback) + assert isinstance(token_details, TokenDetails) + assert 'token_string' == token_details.token + + def callback(token_params): + assert token_params == called_token_params + return TokenDetails(token='another_token_string') + + token_details = ably.auth.request_token( + token_params=called_token_params, auth_callback=callback) + assert 'another_token_string' == token_details.token + ably.close() + + @dont_vary_protocol + @respx.mock + def test_when_auth_url_has_query_string(self): + url = 'http://www.example.com?with=query' + headers = {'foo': 'bar'} + ably = TestAppSync.get_ably_rest(key=None, auth_url=url) + auth_route = respx.get('http://www.example.com', params={'with': 'query', 'spam': 'eggs'}).mock( + return_value=Response(status_code=200, content='token_string', headers={"Content-Type": "text/plain"})) + ably.auth.request_token(auth_url=url, + auth_headers=headers, + auth_params={'spam': 'eggs'}) + assert auth_route.called + ably.close() + + @dont_vary_protocol + def test_client_id_null_for_anonymous_auth(self): + ably = TestAppSync.get_ably_rest( + key=None, + key_name=self.test_vars["keys"][0]["key_name"], + key_secret=self.test_vars["keys"][0]["key_secret"]) + token = ably.auth.authorize() + + assert isinstance(token, TokenDetails) + assert token.client_id is None + assert ably.auth.client_id is None + ably.close() + + @dont_vary_protocol + def test_client_id_null_until_auth(self): + client_id = uuid.uuid4().hex + token_ably = TestAppSync.get_ably_rest( + default_token_params={'client_id': client_id}) + # before auth, client_id is None + assert token_ably.auth.client_id is None + + token = token_ably.auth.authorize() + assert isinstance(token, TokenDetails) + + # after auth, client_id is defined + assert token.client_id == client_id + assert token_ably.auth.client_id == client_id + token_ably.close() + + +class TestRenewToken(BaseAsyncTestCase): + + def setUp(self): + self.test_vars = TestAppSync.get_test_vars() + self.host = 'fake-host.ably.io' + self.ably = TestAppSync.get_ably_rest(use_binary_protocol=False, rest_host=self.host) + # with headers + self.publish_attempts = 0 + self.channel = uuid.uuid4().hex + tokens = ['a_token', 'another_token'] + headers = {'Content-Type': 'application/json'} + self.mocked_api = respx.mock(base_url='https://{}'.format(self.host)) + self.request_token_route = self.mocked_api.post( + "/keys/{}/requestToken".format(self.test_vars["keys"][0]['key_name']), + name="request_token_route") + self.request_token_route.return_value = Response( + status_code=200, + headers=headers, + json={ + 'token': tokens[self.request_token_route.call_count - 1], + 'expires': (time.time() + 60) * 1000 + }, + ) + + def call_back(request): + self.publish_attempts += 1 + if self.publish_attempts in [1, 3]: + return Response( + status_code=201, + headers=headers, + json=[], + ) + return Response( + status_code=401, + headers=headers, + json={ + 'error': {'message': 'Authentication failure', 'statusCode': 401, 'code': 40140} + }, + ) + + self.publish_attempt_route = self.mocked_api.post("/channels/{}/messages".format(self.channel), + name="publish_attempt_route") + self.publish_attempt_route.side_effect = call_back + self.mocked_api.start() + + def tearDown(self): + # We need to have quiet here in order to do not have check if all endpoints were called + self.mocked_api.stop(quiet=True) + self.mocked_api.reset() + self.ably.close() + + # RSA4b + def test_when_renewable(self): + self.ably.auth.authorize() + self.ably.channels[self.channel].publish('evt', 'msg') + assert self.mocked_api["request_token_route"].call_count == 1 + assert self.publish_attempts == 1 + + # Triggers an authentication 401 failure which should automatically request a new token + self.ably.channels[self.channel].publish('evt', 'msg') + assert self.mocked_api["request_token_route"].call_count == 2 + assert self.publish_attempts == 3 + + # RSA4a + def test_when_not_renewable(self): + self.ably.close() + + self.ably = TestAppSync.get_ably_rest( + key=None, + rest_host=self.host, + token='token ID cannot be used to create a new token', + use_binary_protocol=False) + self.ably.channels[self.channel].publish('evt', 'msg') + assert self.publish_attempts == 1 + + publish = self.ably.channels[self.channel].publish + + match = "Need a new token but auth_options does not include a way to request one" + with pytest.raises(AblyAuthException, match=match): + publish('evt', 'msg') + + assert not self.mocked_api["request_token_route"].called + + # RSA4a + def test_when_not_renewable_with_token_details(self): + token_details = TokenDetails(token='a_dummy_token') + self.ably = TestAppSync.get_ably_rest( + key=None, + rest_host=self.host, + token_details=token_details, + use_binary_protocol=False) + self.ably.channels[self.channel].publish('evt', 'msg') + assert self.mocked_api["publish_attempt_route"].call_count == 1 + + publish = self.ably.channels[self.channel].publish + + match = "Need a new token but auth_options does not include a way to request one" + with pytest.raises(AblyAuthException, match=match): + publish('evt', 'msg') + + assert not self.mocked_api["request_token_route"].called + + +class TestRenewExpiredToken(BaseAsyncTestCase): + + def setUp(self): + self.test_vars = TestAppSync.get_test_vars() + self.publish_attempts = 0 + self.channel = uuid.uuid4().hex + + self.host = 'fake-host.ably.io' + key = self.test_vars["keys"][0]['key_name'] + headers = {'Content-Type': 'application/json'} + + self.mocked_api = respx.mock(base_url='https://{}'.format(self.host)) + self.request_token_route = self.mocked_api.post("/keys/{}/requestToken".format(key), + name="request_token_route") + self.request_token_route.return_value = Response( + status_code=200, + headers=headers, + json={ + 'token': 'a_token', + 'expires': int(time.time() * 1000), # Always expires + } + ) + self.publish_message_route = self.mocked_api.post("/channels/{}/messages".format(self.channel), + name="publish_message_route") + self.time_route = self.mocked_api.get("/time", name="time_route") + self.time_route.return_value = Response( + status_code=200, + headers=headers, + json=[int(time.time() * 1000)] + ) + + def cb_publish(request): + self.publish_attempts += 1 + if self.publish_fail: + self.publish_fail = False + return Response( + status_code=401, + json={ + 'error': {'message': 'Authentication failure', 'statusCode': 401, 'code': 40140} + } + ) + return Response( + status_code=201, + json='[]' + ) + + self.publish_message_route.side_effect = cb_publish + self.mocked_api.start() + + def tearDown(self): + self.mocked_api.stop(quiet=True) + self.mocked_api.reset() + + # RSA4b1 + def test_query_time_false(self): + ably = TestAppSync.get_ably_rest(rest_host=self.host) + ably.auth.authorize() + self.publish_fail = True + ably.channels[self.channel].publish('evt', 'msg') + assert self.publish_attempts == 2 + ably.close() + + # RSA4b1 + def test_query_time_true(self): + ably = TestAppSync.get_ably_rest(query_time=True, rest_host=self.host) + ably.auth.authorize() + self.publish_fail = False + ably.channels[self.channel].publish('evt', 'msg') + assert self.publish_attempts == 1 + ably.close() diff --git a/test/ably/testapp.py b/test/ably/testapp.py index 86741f3c..561d6e52 100644 --- a/test/ably/testapp.py +++ b/test/ably/testapp.py @@ -2,6 +2,7 @@ import os import logging +from ably.executer.decorator import run_safe from ably.rest.rest import AblyRest from ably.types.capability import Capability from ably.types.options import Options @@ -27,7 +28,6 @@ port = 8080 tls_port = 8081 - ably = AblyRest(token='not_a_real_token', port=port, tls_port=tls_port, tls=tls, environment=environment, @@ -65,7 +65,7 @@ async def get_test_vars(): TestApp.__test_vars = test_vars log.debug([(app_id, k.get("id", ""), k.get("value", "")) - for k in app_spec.get("keys", [])]) + for k in app_spec.get("keys", [])]) return TestApp.__test_vars @@ -113,3 +113,24 @@ async def clear_test_vars(): await ably.http.delete('/apps/' + test_vars['app_id']) TestApp.__test_vars = None await ably.close() + + +class TestAppSync: + + @staticmethod + @run_safe + async def get_test_vars(): + return await TestApp.get_test_vars() + + @staticmethod + @run_safe + async def get_ably_rest(**kw): + test_vars = await TestApp.get_test_vars() + options = TestApp.get_options(test_vars, **kw) + options.update(kw) + return AblyRest(**options) + + @staticmethod + @run_safe + async def clear_test_vars(): + return await TestApp.clear_test_vars() diff --git a/test/ably/utils.py b/test/ably/utils.py index cb0a5b0d..77c55cf3 100644 --- a/test/ably/utils.py +++ b/test/ably/utils.py @@ -1,3 +1,4 @@ +import asyncio import functools import random import string @@ -87,7 +88,9 @@ def test_decorator(fn): @functools.wraps(fn) async def test_decorated(self, *args, **kwargs): patcher = patch() - await fn(self, *args, **kwargs) + res = fn(self, *args, **kwargs) + if asyncio.iscoroutine(res): + await res unpatch(patcher) assert len(responses) >= 1,\ @@ -144,7 +147,9 @@ def wrap_as(ttype, old_name, old_func): async def wrapper(self): if hasattr(self, 'per_protocol_setup'): self.per_protocol_setup(ttype == 'bin') - await old_func(self) + res = old_func(self) + if asyncio.iscoroutine(res): + await res wrapper.__name__ = old_name + '_' + ttype return wrapper