Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs/update docstrings #82

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 88 additions & 5 deletions rocketry/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,74 @@ class _AppMixin:
session: Session

def task(self, start_cond=None, name=None, **kwargs):
"Create a task"
"""Create a task

Parameters
----------
start_cond : BaseCondition
Starting condition of the task. When evaluates
True, the task is set to run.
name : str
Name of the task. If not specified, the name
is derived by the task class. For function
tasks it is the name of the function.
**kwargs
Additional arguments passed to the task class.

Examples
--------
.. codeblock::

@app.task("daily")
def get_value():
return "Hello"
"""
return self.session.create_task(start_cond=start_cond, name=name, **kwargs)

def param(self, name:Optional[str]=None):
"Set one session parameter (decorator)"
"""Create a parameter (from function)

Parameters
----------
name : str
Name of the parameter.

Examples
--------
.. codeblock::

@app.param("my_value")
def get_value():
return "Hello"
"""
return FuncParam(name, session=self.session)

def cond(self, syntax: Union[str, Pattern, List[Union[str, Pattern]]]=None):
"Create a condition (decorator)"
"""Create a custom condition (from function)

Parameters
----------
syntax : str, regex pattern, optional
String expression for the condition. Used
for condition language.

Examples
--------
.. codeblock::

@app.cond()
def is_foo():
return True or False

"""
return FuncCond(syntax=syntax, session=self.session, decor_return_func=False)

def params(self, **kwargs):
"Set session parameters"
self.session.parameters.update(kwargs)

def include_grouper(self, group:'Grouper'):
"""Include contents of a grouper to the application"""
for task in group.session.tasks:
if group.prefix:
task.name = group.prefix + task.name
Expand All @@ -49,7 +101,18 @@ def include_grouper(self, group:'Grouper'):
self.session.parameters.update(group.session.parameters)

class Rocketry(_AppMixin):
"""Rocketry scheduling application"""
"""Rocketry scheduling application

Parameters
----------
session : Session, optional
Scheduling session. Created if not passed.
logger_repo: redbird.base.BaseRepo, optional
Repository for the log records. MemoryRepo
is created if not passed.
**kwargs
Keyword arguments passed to session creation.
"""

def __init__(self, session:Session=None, logger_repo:Optional[BaseRepo]=None, execution=None, **kwargs):

Expand All @@ -70,7 +133,7 @@ def run(self, debug=False):
self.session.start()

async def serve(self, debug=False):
"Run the scheduler"
"Async run the scheduler"
self.session.config.debug = debug
self.session.set_as_default()
await self.session.serve()
Expand Down Expand Up @@ -106,6 +169,26 @@ def _get_repo(self, repo:str):
raise NotImplementedError(f"Repo creation for {repo} not implemented")

class Grouper(_AppMixin):
"""Task group

This is a group of tasks (and other components)
that can be added to an application later. Useful
for bigger applications.

Parameters
----------
prefix : str, optional
Prefix to add in front of the task names
when including the grouper to an app.
start_cond : BaseCondition, optional
Condition that is added to every tasks' start
condition in the group when including to an app.
Every task in the group must fulfill this
condition as well to start.
execution : str, optional
Execution of the tasks in the group if not specified
in the task.
"""

def __init__(self, prefix:str=None, start_cond=None, execution=None):
self.prefix = prefix
Expand Down
55 changes: 32 additions & 23 deletions rocketry/conditions/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from typing import Callable, Union
from typing import Callable, Type, Union
import datetime
try:
from typing import Literal
except ImportError: # pragma: no cover
from typing_extensions import Literal

from rocketry.conditions.scheduler import SchedulerStarted
from rocketry.conditions.task.task import DependFailure, DependFinish, DependSuccess, TaskFailed, TaskFinished, TaskRunnable, TaskStarted, TaskSucceeded
from rocketry.core import (
Expand All @@ -20,33 +26,33 @@

class TimeCondWrapper(BaseCondition):

def __init__(self, cls_cond, cls_period, **kwargs):
def __init__(self, cls_cond: Type, cls_period: Type, **kwargs):
self._cls_cond = cls_cond
self._cls_period = cls_period
self._cond_kwargs = kwargs

def between(self, start, end):
def between(self, start: Union[str, int, float], end: Union[str, int, float]):
period = self._cls_period(start, end)
return self._get_cond(period)

def before(self, end):
def before(self, end: Union[str, int, float]):
period = self._cls_period(None, end)
return self._get_cond(period)

def after(self, start):
def after(self, start: Union[str, int, float]):
period = self._cls_period(start, None)
return self._get_cond(period)

def on(self, span):
def on(self, span: str):
# Alias for "at"
return self.at(span)

def at(self, span):
def at(self, span: str):
# Alias for "on"
period = self._cls_period(span, time_point=True)
return self._get_cond(period)

def starting(self, start):
def starting(self, start: Union[str, int, float]):
period = self._cls_period(start, start)
return self._get_cond(period)

Expand All @@ -68,11 +74,11 @@ def __init__(self, cls_cond, task=None):
self.cls_cond = cls_cond
self.task = task

def observe(self, **kwargs):
def observe(self, **kwargs) -> bool:
cond = self.get_cond()
return cond.observe(**kwargs)

def __call__(self, task):
def __call__(self, task: Union[str, Task, Callable]):
return TimeActionWrapper(self.cls_cond, task=task)

@property
Expand Down Expand Up @@ -130,7 +136,7 @@ def get_cond(self):
time_of_week = TimeCondWrapper(IsPeriod, TimeOfWeek)
time_of_month = TimeCondWrapper(IsPeriod, TimeOfMonth)

def every(past:str, based="run"):
def every(past: str, based: Literal['run', 'success', 'fail', 'finish'] = "run"):
kws_past = {} # 'unit': 's'
if based == "run":
return TaskStarted(period=TimeDelta(past, kws_past=kws_past)) == 0
Expand All @@ -143,7 +149,7 @@ def every(past:str, based="run"):
else:
raise ValueError(f"Invalid status: {based}")

def cron(__expr=None, **kwargs):
def cron(__expr: str = None, **kwargs: str):
if __expr:
args = __expr.split(" ")
else:
Expand All @@ -155,39 +161,41 @@ def cron(__expr=None, **kwargs):
# Task pipelining
# ---------------

def after_success(task):
def after_success(task: Union[str, Task, Callable]):
return DependSuccess(depend_task=task)

def after_fail(task):
def after_fail(task: Union[str, Task, Callable]):
return DependFailure(depend_task=task)

def after_finish(task):
def after_finish(task: Union[str, Task, Callable]):
return DependFinish(depend_task=task)


def after_all_success(*tasks):
def after_all_success(*tasks: Union[str, Task, Callable]):
return All(*(after_success(task) for task in tasks))

def after_all_fail(*tasks):
def after_all_fail(*tasks: Union[str, Task, Callable]):
return All(*(after_fail(task) for task in tasks))

def after_all_finish(*tasks):
def after_all_finish(*tasks: Union[str, Task, Callable]):
return All(*(after_finish(task) for task in tasks))


def after_any_success(*tasks):
def after_any_success(*tasks: Union[str, Task, Callable]):
return Any(*(after_success(task) for task in tasks))

def after_any_fail(*tasks):
def after_any_fail(*tasks: Union[str, Task, Callable]):
return Any(*(after_fail(task) for task in tasks))

def after_any_finish(*tasks):
def after_any_finish(*tasks: Union[str, Task, Callable]):
return Any(*(after_finish(task) for task in tasks))

# Task Status
# -----------

def running(more_than:str=None, less_than=None, task=None):
def running(more_than: Union[str, datetime.timedelta, float, int] = None,
less_than: Union[str, datetime.timedelta, float, int] = None,
task: Union[str, Task, Callable] = None):
if more_than is not None or less_than is not None:
period = TimeSpanDelta(near=more_than, far=less_than)
else:
Expand All @@ -202,5 +210,6 @@ def running(more_than:str=None, less_than=None, task=None):
# Scheduler
# ---------

def scheduler_running(more_than:str=None, less_than=None):
def scheduler_running(more_than: Union[str, datetime.timedelta, float, int] = None,
less_than: Union[str, datetime.timedelta, float, int] = None):
return SchedulerStarted(period=TimeSpanDelta(near=more_than, far=less_than))
46 changes: 2 additions & 44 deletions rocketry/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,8 @@
from rocketry import Session

class Scheduler(RedBase):
"""Multiprocessing scheduler

Parameters
----------
session : rocketry.session.Session, optional
Session object containing tasks,
parameters and settings,
by default None
max_processes : int, optional
Maximum number of processes
allowed to be started,
by default number of CPUs
tasks_as_daemon : bool, optional
Whether process tasks are run
as daemon (if not specified in
the task) or not, by default True
timeout : str, optional
Timeout of each task if not specified
in the task itself. Must be timedelta
string, by default "30 minutes"
parameters : [type], optional
Parameters of the session.
Can also be passed directly to the
session, by default None
logger : [type], optional
[description], by default None
name : str, optional
Name of the scheduler, deprecated, by default None
restarting : str, optional
How the scheduler is restarted if
Restart exception is raised, by default
"replace"
instant_shutdown : bool, optional
Whether the scheduler tries to shut down
as quickly as possible or wait till all
the tasks have finished, by default False

Attributes
----------
session : rocketry.session.Session
Session for which the scheduler is
for. One session has only one
scheduler.
"""
"""Rocketry's Scheduler"""

session: 'Session'

def __init__(self, session=None,
Expand Down
Loading