Skip to content

Commit

Permalink
3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeshardmind committed Oct 26, 2024
1 parent 62bd8af commit 58c50be
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 22 deletions.
2 changes: 1 addition & 1 deletion async_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.


__version__ = "6.0.1"
__version__ = "7.0.0"
14 changes: 8 additions & 6 deletions async_utils/bg_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from collections.abc import Awaitable, Generator
from concurrent.futures import Future
from contextlib import contextmanager
from typing import Any, TypeAlias, TypeVar
from typing import Any, TypeVar

_T = TypeVar("_T")

_FutureLike: TypeAlias = asyncio.Future[_T] | Awaitable[_T]
type _FutureLike[_T] = asyncio.Future[_T] | Awaitable[_T]

__all__ = ["threaded_loop"]

Expand Down Expand Up @@ -52,12 +52,14 @@ def future_done_callback(_f: Future[Any]) -> object:
return future.result()


def run_forever(loop: asyncio.AbstractEventLoop) -> None:
def run_forever(loop: asyncio.AbstractEventLoop, use_eager_task_factory: bool, /) -> None:
asyncio.set_event_loop(loop)
if use_eager_task_factory:
loop.set_task_factory(asyncio.eager_task_factory)
try:
loop.run_forever()
finally:
loop.run_until_complete(asyncio.sleep(0.05))
loop.run_until_complete(asyncio.sleep(0))
tasks: set[asyncio.Task[Any]] = {t for t in asyncio.all_tasks(loop) if not t.done()}
for t in tasks:
t.cancel()
Expand All @@ -83,7 +85,7 @@ def run_forever(loop: asyncio.AbstractEventLoop) -> None:


@contextmanager
def threaded_loop() -> Generator[LoopWrapper, None, None]:
def threaded_loop(*, use_eager_task_factory: bool = True) -> Generator[LoopWrapper, None, None]:
"""Starts an event loop on a background thread,
and yields an object with scheduling methods for interacting with
the loop.
Expand All @@ -92,7 +94,7 @@ def threaded_loop() -> Generator[LoopWrapper, None, None]:
loop = asyncio.new_event_loop()
thread = None
try:
thread = threading.Thread(target=run_forever, args=(loop,))
thread = threading.Thread(target=run_forever, args=(loop, use_eager_task_factory))
thread.start()
yield LoopWrapper(loop)
finally:
Expand Down
6 changes: 3 additions & 3 deletions async_utils/bg_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
from __future__ import annotations

import asyncio
from collections.abc import Coroutine, Generator
from collections.abc import Coroutine
from contextvars import Context
from types import TracebackType
from typing import Any, Self, TypeAlias, TypeVar
from typing import Any, Self, TypeVar

_T = TypeVar("_T")
_CoroutineLike: TypeAlias = Generator[Any, None, _T] | Coroutine[Any, Any, _T]
type _CoroutineLike[_T] = Coroutine[Any, Any, _T]


__all__ = ["BGTasks"]
Expand Down
7 changes: 3 additions & 4 deletions async_utils/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
from functools import total_ordering
from time import time
from types import TracebackType
from typing import Any, Generic, TypeVar
from typing import Any

__all__ = ("Scheduler",)

MISSING: Any = object()
T = TypeVar("T")


class CancelationToken:
__slots__ = ()


@total_ordering
class _Task(Generic[T]):
class _Task[T]:
__slots__ = ("timestamp", "payload", "canceled", "cancel_token")

def __init__(self, timestamp: float, payload: T, /):
Expand All @@ -44,7 +43,7 @@ def __lt__(self, other: _Task[T]):
return (self.timestamp, id(self)) < (other.timestamp, id(self))


class Scheduler(Generic[T]):
class Scheduler[T]:
__tasks: dict[CancelationToken, _Task[T]]
__tqueue: asyncio.PriorityQueue[_Task[T]]
__closed: bool
Expand Down
6 changes: 2 additions & 4 deletions async_utils/waterfall.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
import asyncio
import time
from collections.abc import Callable, Coroutine, Sequence
from typing import Any, Generic, Literal, TypeVar, overload

T = TypeVar("T")
from typing import Any, Literal, overload

__all__ = ("Waterfall",)


class Waterfall(Generic[T]):
class Waterfall[T]:
"""Class for batch event scheduling based on recurring intervals,
with a quanity threshold which overrides the interval.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ exclude = [
"docs",
]

pythonVersion = "3.11"
pythonVersion = "3.12"
typeCheckingMode = "strict"
pythonPlatform = "All"
reportImportCycles = "error"
Expand All @@ -24,7 +24,7 @@ reportUnnecessaryTypeIgnoreComment = "warning"
[tool.ruff]

line-length = 130
target-version = "py311"
target-version = "py312"

[tool.ruff.format]
line-ending = "lf"
Expand Down
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ classifiers =
Natural Language :: English
Typing :: Typed
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
Programming Language :: Python :: 3.13
license_files =
Expand All @@ -27,7 +26,7 @@ license_files =

[options]
packages = find_namespace:
python_requires = >=3.11.0
python_requires = >=3.12.0
include_package_data = True

[options.package_data]
Expand Down

0 comments on commit 58c50be

Please sign in to comment.