Skip to content

Commit

Permalink
feat: Add synchronous executor (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibordp authored Jun 25, 2020
1 parent f3b39e8 commit 4a954ed
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ Persistent distributed cron using Redis (coordinates execution with parallel ins
app.main()
See the `examples` directory for more examples of usage.


Development
===========

Expand Down
22 changes: 21 additions & 1 deletion src/pyncette/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,34 @@
logger = logging.getLogger(__name__)


class SynchronousExecutor(contextlib.AbstractAsyncContextManager):
def __init__(self, **kwargs: Dict[str, Any]):
pass

async def __aenter__(self) -> SynchronousExecutor:
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[Any],
) -> None:
pass

async def spawn_task(self, task: Awaitable) -> None:
await task


class DefaultExecutor(contextlib.AbstractAsyncContextManager):
"""Manages the spawned tasks running in background"""

_tasks: Dict[object, asyncio.Task]
_semaphore: asyncio.Semaphore

def __init__(self, concurrency_limit: int) -> None:
def __init__(self, **kwargs: Any) -> None:
self._tasks = dict()
concurrency_limit = kwargs.get("concurrency_limit", 100)
self._semaphore = asyncio.Semaphore(concurrency_limit)

async def __aenter__(self) -> DefaultExecutor:
Expand Down
11 changes: 6 additions & 5 deletions src/pyncette/pyncette.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Type

import coloredlogs
import dateutil.tz
Expand Down Expand Up @@ -228,21 +229,21 @@ class Pyncette:
_middlewares: List[MiddlewareFunc]
_repository_factory: RepositoryFactory
_poll_interval: datetime.timedelta
_concurrency_limit: int
_executor_cls: Type
_configuration: Dict[str, Any]

def __init__(
self,
repository_factory: RepositoryFactory = sqlite_repository,
executor_cls: Type = DefaultExecutor,
poll_interval: datetime.timedelta = datetime.timedelta(seconds=1),
concurrency_limit: int = 100,
**kwargs: Any,
) -> None:
self._tasks = []
self._fixtures = []
self._middlewares = []
self._poll_interval = poll_interval
self._concurrency_limit = concurrency_limit
self._executor_cls = executor_cls
self._repository_factory = repository_factory
self._configuration = kwargs

Expand Down Expand Up @@ -310,8 +311,8 @@ async def create(self) -> AsyncIterator[PyncetteContext]:
"""Creates the execution context."""
async with self._repository_factory(
**self._configuration
) as repository, DefaultExecutor(
self._concurrency_limit
) as repository, self._executor_cls(
**self._configuration
) as executor, contextlib.AsyncExitStack() as stack:
root_context = await self._create_root_context(repository, stack)

Expand Down

0 comments on commit 4a954ed

Please sign in to comment.