Skip to content

Commit

Permalink
Merge pull request #175 from evo-company/use-get-running-loop-for-asy…
Browse files Browse the repository at this point in the history
…ncio-executor

Use asyncio.get_running_loop for AsyncIOExecutor as recommended by asyncio docs
  • Loading branch information
skovbasa authored Nov 19, 2024
2 parents b9f382a + 168bbdb commit c599472
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 31 deletions.
2 changes: 2 additions & 0 deletions docs/changelog/changes_08.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Changes in 0.8
}
}
- Drop ``loop`` parameter from ``hiku.executors.asyncio.AsyncIOExecutor`` constructor.
Backward-incompatible changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
35 changes: 10 additions & 25 deletions hiku/executors/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,19 @@
import inspect

from asyncio import (
wait,
FIRST_COMPLETED,
gather,
CancelledError,
Task,
AbstractEventLoop,
)
from asyncio import get_event_loop
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
Optional,
cast,
gather,
get_running_loop,
wait,
)
from typing import TYPE_CHECKING, Any, Callable, Coroutine, cast

from hiku.executors.base import BaseAsyncExecutor
from hiku.result import Proxy


if TYPE_CHECKING:
from hiku.executors.queue import (
Queue,
Workflow,
)
from hiku.executors.queue import Queue, Workflow


class AsyncIOExecutor(BaseAsyncExecutor):
Expand All @@ -35,15 +22,11 @@ class AsyncIOExecutor(BaseAsyncExecutor):
By default it allows to run both synchronous and asynchronous tasks.
To deny synchronous tasks set deny_sync to True.
:param loop: asyncio event loop
:param deny_sync: deny synchronous tasks -
raise TypeError if a task is not awaitable
"""

def __init__(
self, loop: Optional[AbstractEventLoop] = None, deny_sync: bool = False
) -> None:
self.loop = loop or get_event_loop()
def __init__(self, deny_sync: bool = False) -> None:
self.deny_sync = deny_sync

async def _wrapper(self, fn: Callable, *args: Any, **kwargs: Any) -> Any:
Expand All @@ -54,17 +37,19 @@ async def _wrapper(self, fn: Callable, *args: Any, **kwargs: Any) -> Any:
return result

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Task:
loop = get_running_loop()

coro = fn(*args, **kwargs)
if not inspect.isawaitable(coro):
if self.deny_sync:
raise TypeError(
"{!r} returned non-awaitable object {!r}".format(fn, coro)
)

return self.loop.create_task(self._wrapper(fn, *args, **kwargs))
return loop.create_task(self._wrapper(fn, *args, **kwargs))

coro = cast(Coroutine, coro)
return self.loop.create_task(coro)
return loop.create_task(coro)

async def process(self, queue: "Queue", workflow: "Workflow") -> Proxy:
try:
Expand Down
10 changes: 4 additions & 6 deletions tests/test_executor_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import pytest

from hiku.executors.queue import Queue, Workflow
from hiku.executors.asyncio import AsyncIOExecutor
from hiku.executors.queue import Queue, Workflow


def func():
Expand All @@ -28,8 +28,7 @@ async def coroutine():

@pytest.mark.asyncio
async def test_awaitable_check__async_only():
loop = asyncio.get_running_loop()
executor = AsyncIOExecutor(loop, deny_sync=True)
executor = AsyncIOExecutor(deny_sync=True)

with pytest.raises(TypeError) as func_err:
executor.submit(func)
Expand All @@ -52,8 +51,7 @@ async def test_awaitable_check__async_only():

@pytest.mark.asyncio
async def test_awaitable_check__sync_async():
loop = asyncio.get_running_loop()
executor = AsyncIOExecutor(loop)
executor = AsyncIOExecutor()

assert await executor.submit(func) is None
assert await executor.submit(func2) == []
Expand All @@ -79,7 +77,7 @@ class TestWorkflow(Workflow):
def result(self):
raise AssertionError("impossible")

executor = AsyncIOExecutor(loop)
executor = AsyncIOExecutor()

queue = Queue(executor)
queue.submit(queue.fork(None), proc)
Expand Down

0 comments on commit c599472

Please sign in to comment.