diff --git a/rocketry/application.py b/rocketry/application.py index 49475ffa..74e833bc 100644 --- a/rocketry/application.py +++ b/rocketry/application.py @@ -22,15 +22,66 @@ class _AppMixin: session: Session def task(self, start_cond=None, name=None, **kwargs): - "Create a task" + """Create a task + + Parameters + ---------- + start_cond : BaseCondition + Starting condition of the task. When evaluates + True, the task is set to run. + name : str + Name of the task. If not specified, the name + is derived by the task class. For function + tasks it is the name of the function. + **kwargs + Additional arguments passed to the task class. + + Examples + -------- + .. codeblock:: + + @app.task("daily") + def get_value(): + return "Hello" + """ return self.session.create_task(start_cond=start_cond, name=name, **kwargs) def param(self, name:Optional[str]=None): - "Set one session parameter (decorator)" + """Create a parameter (from function) + + Parameters + ---------- + name : str + Name of the parameter. + + Examples + -------- + .. codeblock:: + + @app.param("my_value") + def get_value(): + return "Hello" + """ return FuncParam(name, session=self.session) def cond(self, syntax: Union[str, Pattern, List[Union[str, Pattern]]]=None): - "Create a condition (decorator)" + """Create a custom condition (from function) + + Parameters + ---------- + syntax : str, regex pattern, optional + String expression for the condition. Used + for condition language. + + Examples + -------- + .. codeblock:: + + @app.cond() + def is_foo(): + return True or False + + """ return FuncCond(syntax=syntax, session=self.session, decor_return_func=False) def params(self, **kwargs): @@ -38,6 +89,7 @@ def params(self, **kwargs): self.session.parameters.update(kwargs) def include_grouper(self, group:'Grouper'): + """Include contents of a grouper to the application""" for task in group.session.tasks: if group.prefix: task.name = group.prefix + task.name @@ -49,7 +101,18 @@ def include_grouper(self, group:'Grouper'): self.session.parameters.update(group.session.parameters) class Rocketry(_AppMixin): - """Rocketry scheduling application""" + """Rocketry scheduling application + + Parameters + ---------- + session : Session, optional + Scheduling session. Created if not passed. + logger_repo: redbird.base.BaseRepo, optional + Repository for the log records. MemoryRepo + is created if not passed. + **kwargs + Keyword arguments passed to session creation. + """ def __init__(self, session:Session=None, logger_repo:Optional[BaseRepo]=None, execution=None, **kwargs): @@ -70,7 +133,7 @@ def run(self, debug=False): self.session.start() async def serve(self, debug=False): - "Run the scheduler" + "Async run the scheduler" self.session.config.debug = debug self.session.set_as_default() await self.session.serve() @@ -106,6 +169,26 @@ def _get_repo(self, repo:str): raise NotImplementedError(f"Repo creation for {repo} not implemented") class Grouper(_AppMixin): + """Task group + + This is a group of tasks (and other components) + that can be added to an application later. Useful + for bigger applications. + + Parameters + ---------- + prefix : str, optional + Prefix to add in front of the task names + when including the grouper to an app. + start_cond : BaseCondition, optional + Condition that is added to every tasks' start + condition in the group when including to an app. + Every task in the group must fulfill this + condition as well to start. + execution : str, optional + Execution of the tasks in the group if not specified + in the task. + """ def __init__(self, prefix:str=None, start_cond=None, execution=None): self.prefix = prefix diff --git a/rocketry/conditions/api.py b/rocketry/conditions/api.py index 9a31ac00..62992d9f 100644 --- a/rocketry/conditions/api.py +++ b/rocketry/conditions/api.py @@ -1,4 +1,10 @@ -from typing import Callable, Union +from typing import Callable, Type, Union +import datetime +try: + from typing import Literal +except ImportError: # pragma: no cover + from typing_extensions import Literal + from rocketry.conditions.scheduler import SchedulerStarted from rocketry.conditions.task.task import DependFailure, DependFinish, DependSuccess, TaskFailed, TaskFinished, TaskRunnable, TaskStarted, TaskSucceeded from rocketry.core import ( @@ -20,33 +26,33 @@ class TimeCondWrapper(BaseCondition): - def __init__(self, cls_cond, cls_period, **kwargs): + def __init__(self, cls_cond: Type, cls_period: Type, **kwargs): self._cls_cond = cls_cond self._cls_period = cls_period self._cond_kwargs = kwargs - def between(self, start, end): + def between(self, start: Union[str, int, float], end: Union[str, int, float]): period = self._cls_period(start, end) return self._get_cond(period) - def before(self, end): + def before(self, end: Union[str, int, float]): period = self._cls_period(None, end) return self._get_cond(period) - def after(self, start): + def after(self, start: Union[str, int, float]): period = self._cls_period(start, None) return self._get_cond(period) - def on(self, span): + def on(self, span: str): # Alias for "at" return self.at(span) - def at(self, span): + def at(self, span: str): # Alias for "on" period = self._cls_period(span, time_point=True) return self._get_cond(period) - def starting(self, start): + def starting(self, start: Union[str, int, float]): period = self._cls_period(start, start) return self._get_cond(period) @@ -68,11 +74,11 @@ def __init__(self, cls_cond, task=None): self.cls_cond = cls_cond self.task = task - def observe(self, **kwargs): + def observe(self, **kwargs) -> bool: cond = self.get_cond() return cond.observe(**kwargs) - def __call__(self, task): + def __call__(self, task: Union[str, Task, Callable]): return TimeActionWrapper(self.cls_cond, task=task) @property @@ -130,7 +136,7 @@ def get_cond(self): time_of_week = TimeCondWrapper(IsPeriod, TimeOfWeek) time_of_month = TimeCondWrapper(IsPeriod, TimeOfMonth) -def every(past:str, based="run"): +def every(past: str, based: Literal['run', 'success', 'fail', 'finish'] = "run"): kws_past = {} # 'unit': 's' if based == "run": return TaskStarted(period=TimeDelta(past, kws_past=kws_past)) == 0 @@ -143,7 +149,7 @@ def every(past:str, based="run"): else: raise ValueError(f"Invalid status: {based}") -def cron(__expr=None, **kwargs): +def cron(__expr: str = None, **kwargs: str): if __expr: args = __expr.split(" ") else: @@ -155,39 +161,41 @@ def cron(__expr=None, **kwargs): # Task pipelining # --------------- -def after_success(task): +def after_success(task: Union[str, Task, Callable]): return DependSuccess(depend_task=task) -def after_fail(task): +def after_fail(task: Union[str, Task, Callable]): return DependFailure(depend_task=task) -def after_finish(task): +def after_finish(task: Union[str, Task, Callable]): return DependFinish(depend_task=task) -def after_all_success(*tasks): +def after_all_success(*tasks: Union[str, Task, Callable]): return All(*(after_success(task) for task in tasks)) -def after_all_fail(*tasks): +def after_all_fail(*tasks: Union[str, Task, Callable]): return All(*(after_fail(task) for task in tasks)) -def after_all_finish(*tasks): +def after_all_finish(*tasks: Union[str, Task, Callable]): return All(*(after_finish(task) for task in tasks)) -def after_any_success(*tasks): +def after_any_success(*tasks: Union[str, Task, Callable]): return Any(*(after_success(task) for task in tasks)) -def after_any_fail(*tasks): +def after_any_fail(*tasks: Union[str, Task, Callable]): return Any(*(after_fail(task) for task in tasks)) -def after_any_finish(*tasks): +def after_any_finish(*tasks: Union[str, Task, Callable]): return Any(*(after_finish(task) for task in tasks)) # Task Status # ----------- -def running(more_than:str=None, less_than=None, task=None): +def running(more_than: Union[str, datetime.timedelta, float, int] = None, + less_than: Union[str, datetime.timedelta, float, int] = None, + task: Union[str, Task, Callable] = None): if more_than is not None or less_than is not None: period = TimeSpanDelta(near=more_than, far=less_than) else: @@ -202,5 +210,6 @@ def running(more_than:str=None, less_than=None, task=None): # Scheduler # --------- -def scheduler_running(more_than:str=None, less_than=None): +def scheduler_running(more_than: Union[str, datetime.timedelta, float, int] = None, + less_than: Union[str, datetime.timedelta, float, int] = None): return SchedulerStarted(period=TimeSpanDelta(near=more_than, far=less_than)) diff --git a/rocketry/core/schedule.py b/rocketry/core/schedule.py index 5cbf0693..59cb8866 100644 --- a/rocketry/core/schedule.py +++ b/rocketry/core/schedule.py @@ -22,50 +22,8 @@ from rocketry import Session class Scheduler(RedBase): - """Multiprocessing scheduler - - Parameters - ---------- - session : rocketry.session.Session, optional - Session object containing tasks, - parameters and settings, - by default None - max_processes : int, optional - Maximum number of processes - allowed to be started, - by default number of CPUs - tasks_as_daemon : bool, optional - Whether process tasks are run - as daemon (if not specified in - the task) or not, by default True - timeout : str, optional - Timeout of each task if not specified - in the task itself. Must be timedelta - string, by default "30 minutes" - parameters : [type], optional - Parameters of the session. - Can also be passed directly to the - session, by default None - logger : [type], optional - [description], by default None - name : str, optional - Name of the scheduler, deprecated, by default None - restarting : str, optional - How the scheduler is restarted if - Restart exception is raised, by default - "replace" - instant_shutdown : bool, optional - Whether the scheduler tries to shut down - as quickly as possible or wait till all - the tasks have finished, by default False - - Attributes - ---------- - session : rocketry.session.Session - Session for which the scheduler is - for. One session has only one - scheduler. - """ + """Rocketry's Scheduler""" + session: 'Session' def __init__(self, session=None, diff --git a/rocketry/core/task.py b/rocketry/core/task.py index 8adbdc67..b4068f97 100644 --- a/rocketry/core/task.py +++ b/rocketry/core/task.py @@ -50,9 +50,7 @@ class Task(RedBase, BaseModel): """Base class for Tasks. A task can be a function, command or other procedure that - does a specific thing. A task can be parametrized by supplying - session level parameters or parameters on task basis. - + does a specific thing. Parameters ---------- @@ -73,11 +71,24 @@ class Task(RedBase, BaseModel): tasks with execution='process' or 'thread' if thread termination is implemented in the task, by default AlwaysFalse() - execution : str, {'main', 'thread', 'process'}, default='process' - How the task is executed. Allowed values - 'main' (run on main thread & process), - 'thread' (run on another thread) and - 'process' (run on another process). + execution : str, {'main', 'async', 'thread', 'process'}, default='process' + How the task is executed. Allowed values: + + - ``main``: run on main thread & process and block till execution + - ``async``: run task asynchronously using asyncio + - ``thread``: run task on separate thread + - ``process``: run task on separate process + + priority : int, optional + Priority of the task. Higher priority + tasks are first inspected whether they + can be executed. Can be any numeric value. + By default 0 + timeout : str, int, timedelta, optional + If the task has not run in given timeout + the task will be terminated. Only applicable + for tasks with execution as async, thread or + process. parameters : Parameters, optional Parameters set specifically to the task, by default None @@ -88,38 +99,17 @@ class Task(RedBase, BaseModel): force_run : bool If True, the task will be run once regardless of the start_cond, - by default True + by default False on_startup : bool Run the task on the startup sequence of the Scheduler, by default False on_shutdown : bool Run the task on the shutdown sequence of the Scheduler, by default False - priority : int, optional - Priority of the task. Higher priority - tasks are first inspected whether they - can be executed. Can be any numeric value. - Setup tasks are recommended to have priority - >= 40 if they require loaded tasks, - >= 50 if they require loaded extensions. - By default 0 - timeout : str, int, timedelta, optional - If the task has not run in given timeout - the task will be terminated. Only applicable - for tasks with execution='process' or - with execution='thread'. daemon : Bool, optional Whether run the task as daemon process or not. Only applicable for execution='process', by default use Scheduler default - on_exists : str - What to do if the name of the task already - exists in the session, options: 'raise', - 'ignore', 'replace', by default use session - configuration - logger : str, logger.Logger, optional - Logger of the task. Typically not needed - to be set. session : rocketry.session.Session, optional Session the task is binded to. @@ -266,10 +256,10 @@ def __init__(self, **kwargs): # Set default readable logger if missing self.session._check_readable_logger() - self.register() + self._register() # Update "last_run", "last_success", etc. - self.set_cached() + self._set_cached() # Hooks hooker.postrun() @@ -331,22 +321,9 @@ def __call__(self, *args, **kwargs): self.start(*args, **kwargs) def start(self, *args, **kwargs): - return asyncio.run(self.start_async(*args, **kwargs)) - - async def start_async(self, params:Union[dict, Parameters]=None, **kwargs): - """Execute the task. Creates a new process - (if execution='process'), a new thread - (if execution='thread') or blocks and - runs till the task is completed (if - execution='main'). + return asyncio.run(self._start_async(*args, **kwargs)) - Parameters - ---------- - params : dict, Parameters, optional - Extra parameters for the task. Also - the session parameters, task parameters - and extra parameters are acquired, by default None - """ + async def _start_async(self, params:Union[dict, Parameters]=None, **kwargs): # Remove old threads/processes # (using _process and _threads are most robust way to check if running as process or thread) @@ -386,9 +363,9 @@ async def start_async(self, params:Union[dict, Parameters]=None, **kwargs): # in tests will succeed. time.sleep(1e-6) elif execution == "process": - self.run_as_process(params=params, **kwargs) + self._run_as_process(params=params, **kwargs) elif execution == "thread": - self.run_as_thread(params=params, **kwargs) + self._run_as_thread(params=params, **kwargs) except (SchedulerRestart, SchedulerExit): raise except Exception as exc: @@ -427,10 +404,10 @@ def is_runnable(self): return cond - def run_as_main(self, params:Parameters): - return self._run_as_main(params, self.parameters) + def _run_as_main(self, params:Parameters): + return self._run_in_main(params, self.parameters) - def _run_as_main(self, **kwargs): + def _run_in_main(self, **kwargs): return asyncio.run(self._run_as_async(**kwargs)) async def _run_as_async(self, params:Parameters, direct_params:Parameters, execution=None, **kwargs): @@ -524,7 +501,7 @@ async def _run_as_async(self, params:Parameters, direct_params:Parameters, execu # os.chdir(old_cwd) hooker.postrun(*exc_info) - def run_as_thread(self, params:Parameters, **kwargs): + def _run_as_thread(self, params:Parameters, **kwargs): """Create a new thread and run the task on that.""" params = params.pre_materialize(task=self, session=self.session) @@ -533,26 +510,26 @@ def run_as_thread(self, params:Parameters, **kwargs): self._thread_terminate.clear() event_is_running = threading.Event() - self._thread = threading.Thread(target=self._run_as_thread, args=(params, direct_params, event_is_running)) + self._thread = threading.Thread(target=self._run_in_thread, args=(params, direct_params, event_is_running)) self.last_run = datetime.datetime.fromtimestamp(time.time()) # Needed for termination self._thread.start() event_is_running.wait() # Wait until the task is confirmed to run - def _run_as_thread(self, params:Parameters, direct_params:Parameters, event=None): + def _run_in_thread(self, params:Parameters, direct_params:Parameters, event=None): """Running the task in a new thread. This method should only be run by the new thread.""" self.log_running() event.set() try: - output = self._run_as_main(params=params, direct_params=direct_params, execution="thread") + output = self._run_in_main(params=params, direct_params=direct_params, execution="thread") except: # Task crashed before actually running the execute. self.log_failure() # We cannot rely the exception to main thread here # thus we supress to prevent unnecessary warnings. - def run_as_process(self, params:Parameters, daemon=None, log_queue: multiprocessing.Queue=None): + def _run_as_process(self, params:Parameters, daemon=None, log_queue: multiprocessing.Queue=None): """Create a new process and run the task on that.""" session = self.session @@ -564,7 +541,7 @@ def run_as_process(self, params:Parameters, daemon=None, log_queue: multiprocess daemon = self.daemon if self.daemon is not None else session.config.tasks_as_daemon self._process = multiprocessing.Process( - target=self._run_as_process, + target=self._run_in_process, args=(params, direct_params, log_queue, session.config, self._get_hooks("task_execute")), daemon=daemon ) @@ -577,7 +554,7 @@ def run_as_process(self, params:Parameters, daemon=None, log_queue: multiprocess self._lock_to_run_log(log_queue) return log_queue - def _run_as_process(self, params:Parameters, direct_params:Parameters, queue, config, exec_hooks): + def _run_in_process(self, params:Parameters, direct_params:Parameters, queue, config, exec_hooks): """Running the task in a new process. This method should only be run by the new process.""" @@ -610,7 +587,7 @@ def _run_as_process(self, params:Parameters, direct_params:Parameters, queue, co try: # NOTE: The parameters are "materialized" # here in the actual process that runs the task - output = self._run_as_main(params=params, direct_params=direct_params, execution="process", hooks=exec_hooks) + output = self._run_in_main(params=params, direct_params=direct_params, execution="process", hooks=exec_hooks) except Exception as exc: # Task crashed before running execute (silence=True) self.log_failure() @@ -744,14 +721,14 @@ def is_running(self): """bool: Whether the task is currently running or not.""" return self.get_status() == "run" - def register(self): + def _register(self): if hasattr(self, "_mark_register") and not self._mark_register: del self._mark_register return # on_exists = 'ignore' name = self.name self.session.add_task(self) - def set_cached(self): + def _set_cached(self): "Update cached statuses" # We get the logger here to not flood with warnings if missing repo logger = self.logger @@ -785,10 +762,11 @@ def get_default_name(self, **kwargs): raise NotImplementedError(f"Method 'get_default_name' not implemented to {type(self)}") def is_alive(self) -> bool: - """Whether the task is alive: check if the task has a live process or thread.""" + """Whether the task is alive: check if the task has a live process, async task or thread.""" return self.is_alive_as_async() or self.is_alive_as_thread() or self.is_alive_as_process() def is_alive_as_async(self) -> bool: + """Whether the task has a live async task""" return self._async_task is not None and not self._async_task.done() def is_alive_as_thread(self) -> bool: @@ -863,7 +841,7 @@ def log_record(self, record:logging.LogRecord): self.logger.handle(record) self.status = record.action - def get_status(self) -> Literal['run', 'fail', 'success', 'terminate', 'inaction', None]: + def get_status(self) -> Literal['run', 'fail', 'success', 'terminate', 'inaction', 'crash', None]: """Get latest status of the task.""" if self.session.config.force_status_from_logs: try: @@ -903,7 +881,7 @@ def _set_status(self, action, message=None, return_value=None): if is_running_as_child and action == "success": # If child process, the return value is passed via QueueHandler to the main process # and it's handled then in Scheduler. - # Else the return value is handled in Task itself (__call__ & _run_as_thread) + # Else the return value is handled in Task itself (__call__ & _run_in_thread) extra["__return__"] = return_value log_method = self.logger.exception if action == "fail" else self.logger.info @@ -936,7 +914,7 @@ def get_last_inaction(self) -> datetime.datetime: return self._get_last_action("inaction") def get_last_crash(self) -> datetime.datetime: - """Get the lastest timestamp when the task inacted.""" + """Get the lastest timestamp when the task crashed.""" return self._get_last_action("crash") def get_execution(self) -> str: diff --git a/rocketry/session.py b/rocketry/session.py index 98778f98..f4717063 100644 --- a/rocketry/session.py +++ b/rocketry/session.py @@ -12,6 +12,8 @@ import warnings from pydantic import BaseModel, PrivateAttr, validator +from redbird.base import BaseRepo + from rocketry.pybox.time import to_timedelta from rocketry.log.defaults import create_default_handler from typing import TYPE_CHECKING, Callable, ClassVar, Iterable, Dict, List, Optional, Set, Tuple, Type, Union, Any @@ -91,7 +93,10 @@ class Hooks(BaseModel): scheduler_shutdown: List[Callable] = [] class Session(RedBase): - """Collection of the scheduler objects. + """Scheduling session + + This is a collection of scheduling session's + configurations and components. Parameters ---------- @@ -99,20 +104,8 @@ class Session(RedBase): config : dict, optional Central configuration for defining behaviour of different object and classes in the session. - tasks : Dict[str, rocketry.core.Task], optional - Tasks of the session. Can be formed later. parameters : parameter-like, optional Session level parameters. - scheme : str or list, optional - Premade scheme(s) to use to set up logging, - parameters, setup tasks etc. - as_default : bool, default=True - Whether to set the session as default for next - tasks etc. that don't have session - specified. - kwds_scheduler : dict, optional - Keyword arguments passed to - :py:class:`rocketry.core.Scheduler`. delete_existing_loggers : bool, default=False If True, deletes the loggers that already existed for the task logger basename. @@ -122,13 +115,13 @@ class Session(RedBase): config : dict Central configuration for defining behaviour of different object and classes in the session. + tasks : Set[Task] + Collection of the tasks in the scheduling session. + parameters : Parameters + Session's parameters that can be used in tasks. scheduler : Scheduler - Scheduler of the session. - delete_existing_loggers : bool - If True, all loggers that match the - session.config.basename are deleted (by - default, deletes loggers starting with - 'rocketry.task'). + Scheduler of the session. This is not meant to + be interacted by the user. """ config: Config = Config() @@ -237,10 +230,10 @@ def run(self, *task_names:Tuple[str], execution=None, obey_cond=False): Names of the tasks to run. execution : str Execution method for all of the tasks. - By default, whatever set to each task + By default, whatever set to each task. obey_cond : bool Whether to obey the ``start_cond`` or - force a run regardless. By default, False + force a run regardless. By default, False. .. warning:: @@ -281,16 +274,16 @@ def restart(self): """Restart the scheduler The restart is not instantenous and - will occur after the scheduler finishes - checking one cycle of tasks.""" + will occur when the scheduler finishes + the current cycle.""" self.scheduler._flag_restart.set() def shutdown(self): """Shut down the scheduler The shut down is not instantenous and - will occur after the scheduler finishes - checking one cycle of tasks.""" + will occur when the scheduler finishes + the current cycle.""" warnings.warn(( "Session.shutdown is deprecated. " "Please use Session.shut_down instead" @@ -298,7 +291,11 @@ def shutdown(self): self.scheduler._flag_shutdown.set() def shut_down(self, force=None): - """Shut down the scheduler""" + """Shut down the scheduler + + The shut down is not instantenous and + will occur when the scheduler finishes + the current cycle.""" force = force if force is not None else self.scheduler._flag_shutdown.is_set() self.scheduler._flag_shutdown.set() if force: @@ -349,7 +346,25 @@ def get_cond_parsers(self): return self._cond_parsers def create_task(self, *, command=None, path=None, **kwargs): - "Create a task and put it to the session" + """Create a task and put it to the session + + The task type depends on the passed keyword arguments: + + - If ``command`` is passed, ``CommandTask`` is created + - If ``path`` is passed, lazy ``FuncTask`` is created + (the function is imported only when the task is executed)ยจ + - Else the task is considered to be a decorated ``FuncTask`` + + Parameters + ---------- + command : tuple of str + Names of the tasks to run. + path : str + Execution method for all of the tasks. + By default, whatever set to each task. + **kwargs + Passed to the task initiation. + """ # To avoid circular imports from rocketry.tasks import CommandTask, FuncTask @@ -365,7 +380,13 @@ def create_task(self, *, command=None, path=None, **kwargs): return FuncTask(name_include_module=False, _name_template='{func_name}', **kwargs) def add_task(self, task: 'Task'): - "Add the task to the session" + """Add one task to the session + + Parameters + ---------- + task : Task + A task to add to the session. + """ if_exists = self.config.task_pre_exist exists = task in self if exists: @@ -383,6 +404,13 @@ def add_task(self, task: 'Task'): task.session = self def remove_task(self, task: Union['Task', str]): + """Remove one task from the session + + Parameters + ---------- + task : Task, str + The task to be removed or the name of the task. + """ if isinstance(task, str): task = self[task] self.session.tasks.remove(task) @@ -400,7 +428,7 @@ def task_exists(self, task: 'Task'): else: return False - def get_repo(self): + def get_repo(self) -> BaseRepo: "Get log repo where the task logs are stored" from rocketry.core.log import TaskAdapter basename = self.config.task_logger_basename diff --git a/rocketry/tasks/command.py b/rocketry/tasks/command.py index f27a1a59..ac0c6372 100644 --- a/rocketry/tasks/command.py +++ b/rocketry/tasks/command.py @@ -22,11 +22,14 @@ class CommandTask(Task): command : str, list Command to execute. cwd : str, optional - Sets the current directory before the child is executed. + Sets the current directory before the command is executed. shell : bool, optional If true, the command will be executed through the shell. kwds_popen : dict, optional Keyword arguments to be passed to subprocess.Popen + argform : {'short', '-', 'long', '--'} + Whether parameters are passed to the command as long form + (``--myparam 'a value'``) or as short form (``-myparam 'a value'``). **kwargs : dict See :py:class:`rocketry.core.Task` @@ -34,11 +37,11 @@ class CommandTask(Task): -------- >>> from rocketry.tasks import CommandTask - >>> task = CommandTask("python -m pip install rocketry", name="my_cmd_task_1") + >>> task = CommandTask("python -m pip install rocketry") Or list of commands: - >>> task = CommandTask(["python", "-m", "pip", "install", "rocketry"], name="my_cmd_task_2") + >>> task = CommandTask(["python", "-m", "pip", "install", "rocketry"]) """ command: Union[str, List[str]] diff --git a/rocketry/tasks/func.py b/rocketry/tasks/func.py index 380bac27..eb8709ac 100644 --- a/rocketry/tasks/func.py +++ b/rocketry/tasks/func.py @@ -58,23 +58,30 @@ def __exit__(self, exc_type, exc_val, exc_tb): class FuncTask(Task): """Task that executes a function or callable. + There are three ways to initiate ``FuncTask``: + + - Using decorator (``@FuncTask(...)``) + - Passsing a function (``FuncTask(func=do_things, ...)``) + - Passsing a name of a function (``FuncTask(func_name="main", path="path/to/file.py", ...)``) + Parameters ---------- - func : Callable, str - Function or name of a function to be executed. If string is - passed, the path to the file where the function is should - be passed with ``path`` or in the argument, like - "path/to/file.py:my_func". - path : path-like - Path to the function. Not needed if ``func`` is callable. - delay : bool, optional - If True, the function is imported and set to the task - immediately. If False, the function is imported only - when running the task. By default False if ``func`` is - callable and True if ``func`` is a name of a function. + func : Callable, optional + Function or callable that is to be executed as part of the + task. + func_name : str, optional + Name of the function in the file defined in ``path`` that is + to be executed. + path : path-like, optional + Path to the Python file where the function is. + Not needed if ``func`` was passed. sys_path : list of paths Paths that are appended to ``sys.path`` when the function is imported. + cache : bool + Whether to store the function or callable or reload it + every time the task is executed. Only used if ``func`` + was not specified. **kwargs : dict See :py:class:`rocketry.core.Task` @@ -85,28 +92,25 @@ class FuncTask(Task): >>> from rocketry.tasks import FuncTask >>> def myfunc(): ... ... - >>> task = FuncTask(myfunc, name="my_func_task_1") + >>> task = FuncTask(myfunc) **Via decorator:** >>> from rocketry.tasks import FuncTask - >>> @FuncTask(name='my_func_task_2', start_cond="daily") + >>> @FuncTask() ... def myfunc(): ... ... - If the ``name`` is not defined, the name will be in form - ``path.to.module:myfunc``. - - Or from string using lazy importing: + **From another file:** >>> from rocketry.tasks import FuncTask - >>> task = FuncTask("myfunc", path="path/to/script.py", name='my_func_task_3', start_cond="daily") + >>> task = FuncTask(func_name="myfunc", path="path/to/script.py") Warnings -------- If ``execution='process'``, only picklable functions can be used. - The following will NOT work: + The following might not work: .. code-block:: python