Skip to content

Commit

Permalink
Refactored eventloop and related decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Oct 4, 2023
1 parent 75aaffe commit 031d11f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 45 deletions.
3 changes: 2 additions & 1 deletion ably/executer/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def wrapper(self, *args, **kwargs):
res = fn(self, *args, **kwargs)
if asyncio.iscoroutine(res):
# Handle calls from app eventloop on the same loop, return awaitable
# Can't wait/force block same thread/eventloop, eventloop will be blocked
if caller_eventloop is not None and caller_eventloop == app_loop:
return res

Expand Down Expand Up @@ -102,7 +103,7 @@ def wrapper(self, *args, **kwargs):
# Handle calls from external eventloop, post them on app eventloop
# Return awaitable back to external_eventloop
future = asyncio.run_coroutine_threadsafe(res, app_loop)
return asyncio.wrap_future(future)
return asyncio.wrap_future(future, loop=caller_eventloop)

return res

Expand Down
36 changes: 11 additions & 25 deletions ably/executer/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import threading
from asyncio import events

from ably.executer.eventloop_helper import LoopHelper


class AppEventLoop:
_global: 'AppEventLoop' = None
Expand All @@ -18,43 +20,27 @@ def __init__(self, loop, thread=None):
self.__is_active = True

@staticmethod
def get_global() -> 'AppEventLoop':
if AppEventLoop._global is None or not AppEventLoop._global.__is_active:
AppEventLoop._global = AppEventLoop.create()
return AppEventLoop._global
def get_global(cls) -> 'AppEventLoop':
if cls._global is None or not cls._global.__is_active:
cls._global = cls.create(False)
return cls._global

@staticmethod
def create() -> 'AppEventLoop':
def create(cls, background=True) -> 'AppEventLoop':
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread = threading.Thread(target=loop.run_forever, daemon=background)
thread.start()
return AppEventLoop(loop, thread)
return cls(loop, thread)

@property
def loop(self):
return self.__loop

def run(self, coro, callback):
self.__loop.call_soon()

def run_sync(self, coro):
# todo - can only handle run_sync from different thread than app_loop
# caller_eventloop = None
# try:
# caller_eventloop: events = asyncio.get_running_loop()
# except Exception:
# pass
# Handle calls from app eventloop on the same loop, return awaitable
# if caller_eventloop is not None and caller_eventloop == self.loop:
# task = self.loop.create_task(coro)
# task.add_done_callback

future = asyncio.run_coroutine_threadsafe(coro, self.__loop)
return future.result()
return LoopHelper.force_sync(self.loop, coro)

def run_async(self, coro):
future = asyncio.run_coroutine_threadsafe(coro, self.__loop)
return asyncio.wrap_future(future)
return LoopHelper.run_safe_async(self.loop, coro)

def close(self) -> events:
if self.__thread is not None:
Expand Down
71 changes: 52 additions & 19 deletions ably/executer/eventloop_helper.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,52 @@
# from asyncio import events
#
# def run(loop: events, coro, callback):
# print("run")
#
# def run_safe(loop: events, coro, callback):
# print("run safe")
#
# def run_sync(loop: events, coro):
# print("run sync")
#
# def run_sync_safe(loop: events, coro):
# print("run sync safe")
#
# async def run_async(loop: events, coro):
# print("run async")
#
# async def run_async_safe(loop: events, coro):
# print("run async safe")
import asyncio
from asyncio import events
from concurrent.futures import Future


class LoopHelper:
@classmethod
def run(cls, loop: events, coro, callback):
raise "not implemented"

@classmethod
def run_safe(cls, loop: events, coro, callback):
raise "not implemented"

#
@classmethod
def force_sync(cls, loop: events, coro):
future: Future
caller_eventloop = None
try:
caller_eventloop: events = asyncio.get_running_loop()
except Exception:
pass
# Handle calls from app eventloop on the same loop, return awaitable
if caller_eventloop is not None and caller_eventloop == loop:
raise "can't wait/force sync on the same loop, eventloop will be blocked"

future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()

@classmethod
def run_safe_async(cls, loop: events, coro):
caller_eventloop = None
try:
caller_eventloop: events = asyncio.get_running_loop()
except Exception:
pass
if caller_eventloop is not None and caller_eventloop == loop:
return coro

future = asyncio.run_coroutine_threadsafe(coro, loop)
return asyncio.wrap_future(future, loop=caller_eventloop)

# async def force_regular_fn_async(loop: events, regular_fn, regular_fn_args):
# # Run in the default loop's executor
# return await loop.run_in_executor(None, blocking_fn, blocking_fn_args)

@classmethod
# Run blocking function in default threadpool executor.
async def run_blocking_fn_async(cls, loop: events, blocking_fn, blocking_fn_args):
# Run in the default loop's executor
return await loop.run_in_executor(None, blocking_fn, blocking_fn_args)

0 comments on commit 031d11f

Please sign in to comment.