Skip to content

Commit

Permalink
Updated eventloop and sync_enabled to be provided from options
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Oct 3, 2023
1 parent a718440 commit b152e63
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 103 deletions.
21 changes: 5 additions & 16 deletions ably/executer/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def wrapper(self, *args, **kwargs):
caller_eventloop: events = asyncio.get_running_loop()
except Exception:
pass
app_loop: events = AppEventLoop.get_global().loop
app_loop: events = self.app_loop.loop

res = fn(self, *args, **kwargs)
if asyncio.iscoroutine(res):
Expand Down Expand Up @@ -85,15 +85,15 @@ def run_safe_async(fn):
import asyncio

@functools.wraps(fn)
def wrapper(*args, **kwargs):
def wrapper(self, *args, **kwargs):
caller_eventloop = None
try:
caller_eventloop: events = asyncio.get_running_loop()
except Exception:
pass
app_loop: events = AppEventLoop.get_global().loop
app_loop: events = self.app_loop.loop

res = fn(*args, **kwargs)
res = fn(self, *args, **kwargs)
if asyncio.iscoroutine(res):
# Handle calls from app eventloop on the same loop, return awaitable
if caller_eventloop is not None and caller_eventloop == app_loop:
Expand All @@ -115,19 +115,8 @@ def close_app_eventloop(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):

app_loop = AppEventLoop.get_global()
# Handle result of the given async method, with blocking behaviour
caller_eventloop = None
try:
caller_eventloop: events = asyncio.get_running_loop()
except Exception:
pass
app_loop: events = self.app_loop

# Handle calls from app eventloop on the same loop, return awaitable
if caller_eventloop is not None and caller_eventloop == app_loop.loop:
return fn(self, *args, **kwargs)

# Return awaitable as is!
if not self.sync_enabled:
app_loop.close()
return fn(self, *args, **kwargs)
Expand Down
53 changes: 26 additions & 27 deletions ably/executer/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@
class AppEventLoop:
_global: 'AppEventLoop' = None

loop: events
thread: threading
is_active: bool
__loop: events
__thread: threading
__is_active: bool

def __init__(self):
self.loop = None
self.thread = None
self.is_active = False
def __init__(self, loop=None, thread=None):
if not loop.is_running:
raise Exception("Provided eventloop must be in running state")
self.__loop = loop
self.__thread = thread
self.__is_active = True

@staticmethod
def get_global() -> 'AppEventLoop':
if AppEventLoop._global is None or not AppEventLoop._global.is_active:
if AppEventLoop._global is None or not AppEventLoop._global.__is_active:
AppEventLoop._global = AppEventLoop.create(True)
return AppEventLoop._global

@staticmethod
def create(background=False) -> 'AppEventLoop':
app_loop = AppEventLoop()
app_loop.loop = asyncio.new_event_loop()
app_loop.thread = threading.Thread(target=app_loop.loop.run_forever, daemon=background)
app_loop.thread.start()
app_loop.is_active = True
return app_loop
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=background)
thread.start()
return AppEventLoop(loop, thread)

@property
def loop(self):
return self.__loop

def run(self, coro, callback):
self.loop.call_soon()
self.__loop.call_soon()

def run_sync(self, coro):
# todo - can only handle run_sync from different thread than app_loop
Expand All @@ -45,21 +49,16 @@ def run_sync(self, coro):
# task = self.loop.create_task(coro)
# task.add_done_callback

future = asyncio.run_coroutine_threadsafe(coro, self.loop)
future = asyncio.run_coroutine_threadsafe(coro, self.__loop)
return future.result()

def run_async(self, coro):
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
future = asyncio.run_coroutine_threadsafe(coro, self.__loop)
return asyncio.wrap_future(future)

def _close(self):
if self.loop is not None and self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
if self.thread is not None:
self.thread.join()
self.is_active = False

def close(self) -> events:
if self == AppEventLoop._global: # Global eventloop is shared amongst multiple client instances.
return # It will be reused, and closed automatically once process ends.
self._close()
if self.__thread is not None:
if self.__loop is not None and self.__loop.is_running():
self.__loop.call_soon_threadsafe(self.__loop.stop)
self.__thread.join()
self.__is_active = False
39 changes: 19 additions & 20 deletions ably/executer/eventloop_helper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from asyncio import events


def run(loop: events, coro):
print("run")

def run_safe(loop: events, coro):
print("run safe")

def run_sync(loop: events, coro):
print("run sync")

def run_sync_safe(loop: events, coro):
print("run sync safe")

def run_async(loop: events, coro):
print("run async")

def run_async_safe(loop: events, coro):
print("run async safe")
# from asyncio import events
#
# def run(loop: events, coro, callback):
# print("run")
#
# def run_safe(loop: events, coro, callback):
# print("run safe")
#
# def run_sync(loop: events, coro):
# print("run sync")
#
# def run_sync_safe(loop: events, coro):
# print("run sync safe")
#
# async def run_async(loop: events, coro):
# print("run async")
#
# async def run_async_safe(loop: events, coro):
# print("run async safe")
4 changes: 4 additions & 0 deletions ably/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ async def put(self, url, headers=None, body=None, skip_auth=False, timeout=None)
def sync_enabled(self):
return self.__ably.sync_enabled

@property
def app_loop(self):
return self.__ably.app_loop

@property
def auth(self):
return self.__auth
Expand Down
4 changes: 4 additions & 0 deletions ably/http/paginatedresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ async def next(self):
def sync_enabled(self):
return self.__http.sync_enabled

@property
def app_loop(self):
return self.__http.app_loop

async def __get_rel(self, rel_req):
if rel_req is None:
return None
Expand Down
4 changes: 4 additions & 0 deletions ably/rest/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ async def create_token_request(self, token_params: Optional[dict] = None, key_na
def sync_enabled(self):
return self.ably.sync_enabled

@property
def app_loop(self):
return self.ably.app_loop

@property
def ably(self):
return self.__ably
Expand Down
4 changes: 4 additions & 0 deletions ably/rest/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ async def status(self):
def sync_enabled(self):
return self.ably.sync_enabled

@property
def app_loop(self):
return self.ably.app_loop

@property
def ably(self):
return self.__ably
Expand Down
19 changes: 9 additions & 10 deletions ably/rest/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,18 @@ def __init__(self, key: Optional[str] = None, token: Optional[str] = None,
else:
options = Options(**kwargs)

self.__loop = options.loop
self.__sync_enabled = options.sync_enabled
if self.__sync_enabled:
if options.loop is None:
self.__app_loop = AppEventLoop.create()
else:
self.__app_loop = AppEventLoop(loop=options.loop)

try:
self._is_realtime
except AttributeError:
self._is_realtime = False
self.__sync_enabled = False

self.__http = Http(self, options)
self.__auth = Auth(self, options)
self.__http.auth = self.__auth
Expand Down Expand Up @@ -120,14 +126,7 @@ def sync_enabled(self):

@property
def app_loop(self):
return self.__loop

@sync_enabled.setter
def sync_enabled(self, enable_sync):
self.__sync_enabled = enable_sync
if enable_sync:
if self.__loop is None:
self.__loop = AppEventLoop.get_global()
return self.__app_loop

@property
def http(self):
Expand Down
8 changes: 7 additions & 1 deletion ably/types/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,
loop=None, auto_connect=True, suspended_retry_timeout=None, connectivity_check_url=None,
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, **kwargs):
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, sync_enabled=False,
**kwargs):

super().__init__(**kwargs)

Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
self.__disconnected_retry_timeout = disconnected_retry_timeout
self.__channel_retry_timeout = channel_retry_timeout
self.__idempotent_rest_publishing = idempotent_rest_publishing
self.__sync_enabled = sync_enabled
self.__loop = loop
self.__auto_connect = auto_connect
self.__connection_state_ttl = connection_state_ttl
Expand Down Expand Up @@ -222,6 +224,10 @@ def channel_retry_timeout(self):
def idempotent_rest_publishing(self):
return self.__idempotent_rest_publishing

@property
def sync_enabled(self):
return self.__sync_enabled

@property
def loop(self):
return self.__loop
Expand Down
4 changes: 4 additions & 0 deletions ably/types/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ async def history(self, limit=None, direction=None, start=None, end=None):
def sync_enabled(self):
return self.__http.sync_enabled

@property
def app_loop(self):
return self.__http.app_loop


def make_presence_response_handler(cipher):
def encrypted_presence_response_handler(response):
Expand Down
14 changes: 6 additions & 8 deletions test/ably/sync/sync_restauth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def test_auth_init_with_token(self):

# RSA11
def test_request_basic_auth_header(self):
ably = AblyRest(key_secret='foo', key_name='bar')
ably.sync_enabled = True
ably = AblyRest(key_secret='foo', key_name='bar', sync_enabled=True)
with mock.patch.object(AsyncClient, 'send') as get_mock:
try:
ably.http.get('/time', skip_auth=False)
Expand All @@ -91,11 +90,10 @@ def test_request_basic_auth_header(self):
request = get_mock.call_args_list[0][0][0]
authorization = request.headers['Authorization']
assert authorization == 'Basic %s' % base64.b64encode('bar:foo'.encode('ascii')).decode('utf-8')

ably.close()
# RSA7e2
def test_request_basic_auth_header_with_client_id(self):
ably = AblyRest(key_secret='foo', key_name='bar', client_id='client_id')
ably.sync_enabled = True
ably = AblyRest(key_secret='foo', key_name='bar', client_id='client_id', sync_enabled=True)
with mock.patch.object(AsyncClient, 'send') as get_mock:
try:
ably.http.get('/time', skip_auth=False)
Expand All @@ -104,10 +102,10 @@ def test_request_basic_auth_header_with_client_id(self):
request = get_mock.call_args_list[0][0][0]
client_id = request.headers['x-ably-clientid']
assert client_id == base64.b64encode('client_id'.encode('ascii')).decode('utf-8')
ably.close()

def test_request_token_auth_header(self):
ably = AblyRest(token='not_a_real_token')
ably.sync_enabled = True
ably = AblyRest(token='not_a_real_token', sync_enabled=True)
with mock.patch.object(AsyncClient, 'send') as get_mock:
try:
ably.http.get('/time', skip_auth=False)
Expand All @@ -116,7 +114,7 @@ def test_request_token_auth_header(self):
request = get_mock.call_args_list[0][0][0]
authorization = request.headers['Authorization']
assert authorization == 'Bearer %s' % base64.b64encode('not_a_real_token'.encode('ascii')).decode('utf-8')

ably.close()
def test_if_cant_authenticate_via_token(self):
with pytest.raises(ValueError):
AblyRest(use_token_auth=True)
Expand Down
Loading

0 comments on commit b152e63

Please sign in to comment.