diff --git a/docs/changelog/changes_08.rst b/docs/changelog/changes_08.rst index 34d1bd5..9fc5e13 100644 --- a/docs/changelog/changes_08.rst +++ b/docs/changelog/changes_08.rst @@ -40,6 +40,8 @@ Changes in 0.8 } } +- Drop ``loop`` parameter from ``hiku.executors.asyncio.AsyncIOExecutor`` constructor. + Backward-incompatible changes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/hiku/executors/asyncio.py b/hiku/executors/asyncio.py index 0c5d0b2..bf48c10 100644 --- a/hiku/executors/asyncio.py +++ b/hiku/executors/asyncio.py @@ -1,32 +1,19 @@ import inspect - from asyncio import ( - wait, FIRST_COMPLETED, - gather, CancelledError, Task, - AbstractEventLoop, -) -from asyncio import get_event_loop -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Coroutine, - Optional, - cast, + gather, + get_running_loop, + wait, ) +from typing import TYPE_CHECKING, Any, Callable, Coroutine, cast from hiku.executors.base import BaseAsyncExecutor from hiku.result import Proxy - if TYPE_CHECKING: - from hiku.executors.queue import ( - Queue, - Workflow, - ) + from hiku.executors.queue import Queue, Workflow class AsyncIOExecutor(BaseAsyncExecutor): @@ -35,15 +22,11 @@ class AsyncIOExecutor(BaseAsyncExecutor): By default it allows to run both synchronous and asynchronous tasks. To deny synchronous tasks set deny_sync to True. - :param loop: asyncio event loop :param deny_sync: deny synchronous tasks - raise TypeError if a task is not awaitable """ - def __init__( - self, loop: Optional[AbstractEventLoop] = None, deny_sync: bool = False - ) -> None: - self.loop = loop or get_event_loop() + def __init__(self, deny_sync: bool = False) -> None: self.deny_sync = deny_sync async def _wrapper(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: @@ -54,6 +37,8 @@ async def _wrapper(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: return result def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Task: + loop = get_running_loop() + coro = fn(*args, **kwargs) if not inspect.isawaitable(coro): if self.deny_sync: @@ -61,10 +46,10 @@ def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Task: "{!r} returned non-awaitable object {!r}".format(fn, coro) ) - return self.loop.create_task(self._wrapper(fn, *args, **kwargs)) + return loop.create_task(self._wrapper(fn, *args, **kwargs)) coro = cast(Coroutine, coro) - return self.loop.create_task(coro) + return loop.create_task(coro) async def process(self, queue: "Queue", workflow: "Workflow") -> Proxy: try: diff --git a/tests/test_executor_asyncio.py b/tests/test_executor_asyncio.py index 6d2d692..1dff56f 100644 --- a/tests/test_executor_asyncio.py +++ b/tests/test_executor_asyncio.py @@ -2,8 +2,8 @@ import pytest -from hiku.executors.queue import Queue, Workflow from hiku.executors.asyncio import AsyncIOExecutor +from hiku.executors.queue import Queue, Workflow def func(): @@ -28,8 +28,7 @@ async def coroutine(): @pytest.mark.asyncio async def test_awaitable_check__async_only(): - loop = asyncio.get_running_loop() - executor = AsyncIOExecutor(loop, deny_sync=True) + executor = AsyncIOExecutor(deny_sync=True) with pytest.raises(TypeError) as func_err: executor.submit(func) @@ -52,8 +51,7 @@ async def test_awaitable_check__async_only(): @pytest.mark.asyncio async def test_awaitable_check__sync_async(): - loop = asyncio.get_running_loop() - executor = AsyncIOExecutor(loop) + executor = AsyncIOExecutor() assert await executor.submit(func) is None assert await executor.submit(func2) == [] @@ -79,7 +77,7 @@ class TestWorkflow(Workflow): def result(self): raise AssertionError("impossible") - executor = AsyncIOExecutor(loop) + executor = AsyncIOExecutor() queue = Queue(executor) queue.submit(queue.fork(None), proc)