diff --git a/ably/executer/eventloop.py b/ably/executer/eventloop.py index aca47b66..9b6bd111 100644 --- a/ably/executer/eventloop.py +++ b/ably/executer/eventloop.py @@ -30,6 +30,21 @@ def create() -> 'AppEventLoop': app_loop.is_active = True return app_loop + def run_sync(self, coro): + # todo - can only handle run_sync from different thread than app_loop + # caller_eventloop = None + # try: + # caller_eventloop: events = asyncio.get_running_loop() + # except Exception: + # pass + # Handle calls from app eventloop on the same loop, return awaitable + # if caller_eventloop is not None and caller_eventloop == self.loop: + # task = self.loop.create_task(coro) + # task.add_done_callback + + future = asyncio.run_coroutine_threadsafe(coro, self.loop) + return future.result() + def close(self) -> events: if self.loop is not None and self.loop.is_running(): self.loop.call_soon_threadsafe(self.loop.stop) diff --git a/test/ably/sync/sync_restchannelpublish_test.py b/test/ably/sync/sync_restchannelpublish_test.py index f0ccd371..7becbc9b 100644 --- a/test/ably/sync/sync_restchannelpublish_test.py +++ b/test/ably/sync/sync_restchannelpublish_test.py @@ -10,7 +10,7 @@ import msgpack import pytest -from ably import AblyException, IncompatibleClientIdException +from ably import AblyException, IncompatibleClientIdException, AblyRest from ably import api_version from ably.rest.auth import Auth from ably.types.message import Message @@ -522,14 +522,15 @@ def test_idempotent_mixed_ids(self): assert request_body[0]['id'] == 'foobar' assert 'id' not in request_body[1] - def get_ably_rest(self, *args, **kwargs): + def get_ably_rest(self, *args, **kwargs) -> AblyRest: kwargs['use_binary_protocol'] = self.use_binary_protocol return TestAppSync.get_ably_rest(*args, **kwargs) # RSL1k4 def test_idempotent_library_generated_retry(self): test_vars = TestAppSync.get_test_vars() - ably = self.get_ably_rest(idempotent_rest_publishing=True, fallback_hosts=[test_vars["host"]] * 3) + ably: AblyRest = self.get_ably_rest(idempotent_rest_publishing=True, + fallback_hosts=[test_vars["host"]] * 3) channel = ably.channels[self.get_channel_name()] state = {'failures': 0} @@ -550,7 +551,8 @@ async def side_effect(*args, **kwargs): assert state['failures'] == 2 history = channel.history() assert len(history.items) == 1 - client.aclose() + + ably.app_loop.run_sync(client.aclose()) ably.close() # RSL1k5