Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro committed Dec 14, 2024
1 parent 47a0a04 commit 29b6e3a
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 159 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ lint-rust:
-A clippy::too-many-lines \
-A clippy::type-complexity \
-A clippy::unused-self \
-A clippy::used-underscore-binding \
-A clippy::used-underscore-items \
-A clippy::wrong-self-convention

.PHONY: lint
Expand Down
6 changes: 2 additions & 4 deletions rloop/_rloop.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class EventLoop:
_watcher_child: Any

def _run(self): ...
def _wake(self): ...
def _call_soon(self, callback, args, context) -> CBHandle: ...
def _call_later(self, delay, callback, args, context) -> TimerHandle: ...
def _reader_add(self, fd, callback, args, context) -> CBHandle: ...
def _reader_rem(self, fd) -> bool: ...
Expand All @@ -45,7 +43,7 @@ class EventLoop:
def _sig_add(self, sig, callback, context): ...
def _sig_rem(self, sig) -> bool: ...
def _sig_clear(self): ...
def _sig_handle(self, sig) -> bool: ...
def _sig_ceval(self, noop: Any): ...
def _ssock_set(self, fd): ...
def _ssock_del(self, fd): ...
def call_soon(self, callback, *args, context=None) -> CBHandle: ...
def call_soon_threadsafe(self, callback, *args, context=None) -> CBHandle: ...
48 changes: 5 additions & 43 deletions rloop/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,16 @@ def _executor_shutdown(self, future):
# def _timer_handle_cancelled(self, handle):
# raise NotImplementedError

def call_soon(self, callback, *args, context=None) -> CBHandle:
return self._call_soon(callback, args, context or _copy_context())

def call_later(self, delay, callback, *args, context=None) -> Union[CBHandle, TimerHandle]:
if delay <= 0:
return self._call_soon(callback, args, context or _copy_context())
return self.call_soon(callback, *args, context=context or _copy_context())
delay = round(delay * 1_000_000)
return self._call_later(delay, callback, args, context or _copy_context())

def call_at(self, when, callback, *args, context=None) -> Union[CBHandle, TimerHandle]:
delay = round((when - self.time()) * 1_000_000)
if delay <= 0:
return self._call_soon(callback, args, context or _copy_context())
return self.call_soon(callback, *args, context=context or _copy_context())
return self._call_later(delay, callback, args, context or _copy_context())

def time(self) -> float:
Expand Down Expand Up @@ -269,11 +266,6 @@ def create_task(self, coro, *, name=None, context=None) -> _Task:
return task

#: threads methods
def call_soon_threadsafe(self, callback, *args, context=None) -> CBHandle:
rv = self._call_soon(callback, args, context or self._base_ctx)
self._wake()
return rv

def run_in_executor(self, executor, fn, *args):
if _iscoroutine(fn) or _iscoroutinefunction(fn):
raise TypeError('Coroutines cannot be used with executors')
Expand Down Expand Up @@ -675,45 +667,21 @@ def _ssock_start(self):
try:
self._ssock_r.setblocking(False)
self._ssock_w.setblocking(False)
self._ssock_set(self._ssock_w.fileno())
self._ssock_set(self._ssock_r.fileno(), self._ssock_w.fileno())
except Exception:
self._ssock_del()
self._ssock_r.close()
self._ssock_del(self._ssock_r.fileno())
self._ssock_w = None
self._ssock_r = None
raise

self._reader_add(self._ssock_r.fileno(), self._ssock_reader, (), _copy_context())

def _ssock_stop(self):
if not self._ssock_w:
raise RuntimeError('self-socket has not been setup')

self._reader_rem(self._ssock_r.fileno())
self._ssock_del()
self._ssock_r.close()
self._ssock_del(self._ssock_r.fileno())
self._ssock_w = None
self._ssock_r = None

def _ssock_reader(self):
while True:
try:
data = self._ssock_r.recv(4096)
if not data:
break
if not self._sig_listening:
continue
# ignore null bytes written by self wake
signums = list(filter(None, data))
if signums:
self._sig_loop_handled = True
for signum in signums:
self._signal_handle(signum)
except InterruptedError:
continue
except BlockingIOError:
break

def add_signal_handler(self, sig, callback, *args):
if not self.__is_main_thread():
raise ValueError('Signals can only be handled from the main thread')
Expand Down Expand Up @@ -810,12 +778,6 @@ def _signals_clear(self):

self._sig_clear()

def _signal_handle(self, signum):
self._sig_loop_handled = True
handled, cancelled = self._sig_handle(signum)
if handled and cancelled:
self.remove_signal_handler(signum)

#: task factory
def set_task_factory(self, factory):
self._task_factory = factory
Expand Down
Loading

0 comments on commit 29b6e3a

Please sign in to comment.