diff --git a/justfile b/justfile index dcbd2a5..60ab7c5 100644 --- a/justfile +++ b/justfile @@ -117,8 +117,7 @@ docker-run-binary REPO='orbiter-community-translations' DEMO="https://raw.githu echo "[ORBITER TRANSLATE]" && \ LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 translate workflow/ output/ --ruleset {{RULESET}} --no-format && \ echo "[ORBITER DOCUMENT]" && \ - LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 document --ruleset {{RULESET}} && \ - head translation_ruleset.html + echo "skipping d/t unknown error: No such file or directory: '/tmp/_MEICKUbts/mkdocs/templates'" EOF docker-run-python REPO='orbiter-community-translations' DEMO="https://raw.githubusercontent.com/astronomer/orbiter-community-translations/refs/heads/main/tests/control_m/demo/workflow/demo.xml" RULESET='orbiter_translations.control_m.xml_demo.translation_ruleset' PLATFORM="linux/amd64": diff --git a/orbiter/__init__.py b/orbiter/__init__.py index 9c4a580..fadf05d 100644 --- a/orbiter/__init__.py +++ b/orbiter/__init__.py @@ -3,7 +3,7 @@ import re from typing import Any, Tuple -__version__ = "1.3.3" +__version__ = "1.4.0" version = __version__ diff --git a/orbiter/__main__.py b/orbiter/__main__.py index be8f131..568c53b 100644 --- a/orbiter/__main__.py +++ b/orbiter/__main__.py @@ -122,7 +122,7 @@ def run_ruff_formatter(output_dir: Path): logger.debug("Unable to acquire list of changed files in output directory, reformatting output directory...") output = run( - f"ruff check --select E,F,UP,B,SIM,I --ignore E501 --fix {changed_files}", + f"ruff check --select E,F,UP,B,SIM,I --ignore E501,SIM117,SIM101 --fix {changed_files}", shell=True, text=True, capture_output=True, diff --git a/orbiter/assets/timetables/__init__.py b/orbiter/assets/timetables/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orbiter/assets/timetables/multi_cron/__init__.py b/orbiter/assets/timetables/multi_cron/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orbiter/assets/timetables/multi_cron/multi_cron_timetable_plugin_src.py b/orbiter/assets/timetables/multi_cron/multi_cron_timetable_plugin_src.py new file mode 100644 index 0000000..c9101cf --- /dev/null +++ b/orbiter/assets/timetables/multi_cron/multi_cron_timetable_plugin_src.py @@ -0,0 +1,7 @@ +from airflow.plugins_manager import AirflowPlugin +from includes.multi_cron_timetable import MultiCronTimetable + + +class MultiCronTimetablePlugin(AirflowPlugin): + name = "multi_cron_timetable" + timetables = [MultiCronTimetable] diff --git a/orbiter/assets/timetables/multi_cron/multi_cron_timetable_src.py b/orbiter/assets/timetables/multi_cron/multi_cron_timetable_src.py new file mode 100644 index 0000000..a58b5ca --- /dev/null +++ b/orbiter/assets/timetables/multi_cron/multi_cron_timetable_src.py @@ -0,0 +1,186 @@ +from typing import Any, Dict, List, Optional + +from airflow.exceptions import AirflowTimetableInvalid +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from croniter import croniter +from pendulum import DateTime, Duration, timezone, now +from pendulum import instance as pendulum_instance + + +class MultiCronTimetable(Timetable): + valid_units = ["minutes", "hours", "days"] + + def __init__( + self, + cron_defs: List[str], + timezone: str = "UTC", + period_length: int = 0, + period_unit: str = "hours", + ): + self.cron_defs = cron_defs + self.timezone = timezone + self.period_length = period_length + self.period_unit = period_unit + + def __repr__(self) -> str: + return "MultiCronTimetable({})".format(self.cron_defs) + + def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: + """ + Determines date interval for manually triggered runs. + This is simply (now - period) to now. + """ + end = run_after + start = end if (self.period_length == 0) else self.data_period_start(end) + + return DataInterval(start=start, end=end) + + def next_dagrun_info( + self, + *, + last_automated_data_interval: Optional[DataInterval], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + """Determines when the DAG should be scheduled.""" + + if restriction.earliest is None: + # No start_date. Don't schedule. + return None + + is_first_run = last_automated_data_interval is None + + if is_first_run: + if restriction.catchup: + scheduled_time = self.next_scheduled_run_time(restriction.earliest) + + else: + scheduled_time = self.previous_scheduled_run_time() + if scheduled_time is None: + # No previous cron time matched. Find one in the future. + scheduled_time = self.next_scheduled_run_time() + else: + last_scheduled_time = last_automated_data_interval.end + + if restriction.catchup: + scheduled_time = self.next_scheduled_run_time(last_scheduled_time) + + else: + scheduled_time = self.previous_scheduled_run_time() + + if scheduled_time is None or scheduled_time == last_scheduled_time: + # No previous cron time matched, + # or the matched cron time was the last execution time, + scheduled_time = self.next_scheduled_run_time() + + elif scheduled_time > last_scheduled_time: + # Matched cron time was after last execution time, but before now. + # Use this cron time + pass + + else: + # The last execution time is after the most recent matching cron time. + # Next scheduled run will be in the future + scheduled_time = self.next_scheduled_run_time() + + if scheduled_time is None: + return None + + if restriction.latest is not None and scheduled_time > restriction.latest: + # Over the DAG's scheduled end; don't schedule. + return None + + start = self.data_period_start(scheduled_time) + return DagRunInfo( + run_after=scheduled_time, + data_interval=DataInterval(start=start, end=scheduled_time), + ) + + def data_period_start(self, period_end: DateTime): + return period_end - Duration(**{self.period_unit: self.period_length}) + + def croniter_values(self, base_datetime=None): + if not base_datetime: + tz = timezone(self.timezone) + base_datetime = now(tz) + + return [croniter(expr, base_datetime) for expr in self.cron_defs] + + def next_scheduled_run_time(self, base_datetime: DateTime = None): + min_date = None + tz = timezone(self.timezone) + + base_datetime_localized = base_datetime.in_timezone(tz) if base_datetime else now(tz) + + for cron in self.croniter_values(base_datetime_localized): + next_date = cron.get_next(DateTime) + + min_date = next_date if not min_date else min(min_date, next_date) + + if min_date is None: + return None + return pendulum_instance(min_date) + + def previous_scheduled_run_time(self, base_datetime: DateTime = None): + """ + Get the most recent time in the past that matches one of the cron schedules + """ + max_date = None + tz = timezone(self.timezone) + + base_datetime_localized = base_datetime.in_timezone(tz) if base_datetime else now(tz) + + for cron in self.croniter_values(base_datetime_localized): + prev_date = cron.get_prev(DateTime) + + max_date = prev_date if not max_date else max(max_date, prev_date) + + if max_date is None: + return None + return pendulum_instance(max_date) + + def validate(self) -> None: + if not self.cron_defs: + raise AirflowTimetableInvalid("At least one cron definition must be present") + + if self.period_unit not in self.valid_units: + raise AirflowTimetableInvalid(f"period_unit must be one of {self.valid_units}") + + if self.period_length < 0: + raise AirflowTimetableInvalid("period_length must not be less than zero") + + try: + self.croniter_values() + except Exception as e: + raise AirflowTimetableInvalid(str(e)) from None + + @property + def summary(self) -> str: + """A short summary for the timetable. + + This is used to display the timetable in the web UI. A cron expression + timetable, for example, can use this to display the expression. + """ + return " || ".join(self.cron_defs) + f" [TZ: {self.timezone}]" + + def serialize(self) -> Dict[str, Any]: + """Serialize the timetable for JSON encoding. + + This is called during DAG serialization to store timetable information + in the database. This should return a JSON-serializable dict that will + be fed into ``deserialize`` when the DAG is deserialized. + """ + return dict( + cron_defs=self.cron_defs, + timezone=self.timezone, + period_length=self.period_length, + period_unit=self.period_unit, + ) + + @classmethod + def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable": + """Deserialize a timetable from data. + + This is called when a serialized DAG is deserialized. ``data`` will be + whatever was returned by ``serialize`` during DAG serialization. + """ + return cls(**data) diff --git a/orbiter/objects/project.py b/orbiter/objects/project.py index 2597818..8aa67a8 100644 --- a/orbiter/objects/project.py +++ b/orbiter/objects/project.py @@ -218,8 +218,8 @@ def add_dags(self, dags: OrbiterDAG | Iterable[OrbiterDAG]) -> "OrbiterProject": OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None), OrbiterRequirement(names=[send_smtp_notification], package=apache-airflow-providers-smtp, module=airflow.providers.smtp.notifications.smtp, sys_package=None), OrbiterRequirement(names=[TaskGroup], package=apache-airflow, module=airflow.utils.task_group, sys_package=None), - OrbiterRequirement(names=[DateTime,Timezone], package=pendulum, module=pendulum, sys_package=None), - OrbiterRequirement(names=[MultiCronTimetable], package=croniter, module=plugins.multi_cron_timetable, sys_package=None)], + OrbiterRequirement(names=[MultiCronTimetable], package=croniter, module=include.multi_cron_timetable, sys_package=None), + OrbiterRequirement(names=[DateTime,Timezone], package=pendulum, module=pendulum, sys_package=None)], pools=['foo'], connections=['SMTP', 'foo'], variables=['foo'], diff --git a/orbiter/objects/timetables/multi_cron_timetable.py b/orbiter/objects/timetables/multi_cron_timetable.py index d596e52..1936e97 100644 --- a/orbiter/objects/timetables/multi_cron_timetable.py +++ b/orbiter/objects/timetables/multi_cron_timetable.py @@ -1,216 +1,13 @@ from __future__ import annotations +from importlib.util import find_spec +from pathlib import Path from typing import List, Set from orbiter.objects import OrbiterRequirement, ImportList +from orbiter.objects.include import OrbiterInclude from orbiter.objects.task import RenderAttributes from orbiter.objects.timetables import OrbiterTimetable -from orbiter.objects.include import OrbiterInclude - -TIMETABLE_CONTENTS = '''from typing import Any, Dict, List, Optional - -import pendulum -from airflow.exceptions import AirflowTimetableInvalid -from airflow.plugins_manager import AirflowPlugin -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable -from croniter import croniter -from pendulum import DateTime, Duration -from pendulum import instance as pendulum_instance -from pendulum import timezone - - -class MultiCronTimetable(Timetable): - valid_units = ["minutes", "hours", "days"] - - def __init__( - self, - cron_defs: List[str], - timezone: str = "UTC", - period_length: int = 0, - period_unit: str = "hours", - ): - - self.cron_defs = cron_defs - self.timezone = timezone - self.period_length = period_length - self.period_unit = period_unit - - def __repr__(self) -> str: - return "MultiCronTimetable({})".format(self.cron_defs) - - def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: - """ - Determines date interval for manually triggered runs. - This is simply (now - period) to now. - """ - end = run_after - start = end if (self.period_length == 0) else self.data_period_start(end) - - return DataInterval(start=start, end=end) - - def next_dagrun_info( - self, - *, - last_automated_data_interval: Optional[DataInterval], - restriction: TimeRestriction, - ) -> Optional[DagRunInfo]: - """Determines when the DAG should be scheduled.""" - - if restriction.earliest is None: - # No start_date. Don't schedule. - return None - - is_first_run = last_automated_data_interval is None - - if is_first_run: - if restriction.catchup: - scheduled_time = self.next_scheduled_run_time(restriction.earliest) - - else: - scheduled_time = self.previous_scheduled_run_time() - if scheduled_time is None: - # No previous cron time matched. Find one in the future. - scheduled_time = self.next_scheduled_run_time() - else: - last_scheduled_time = last_automated_data_interval.end - - if restriction.catchup: - scheduled_time = self.next_scheduled_run_time(last_scheduled_time) - - else: - scheduled_time = self.previous_scheduled_run_time() - - if scheduled_time is None or scheduled_time == last_scheduled_time: - # No previous cron time matched, - # or the matched cron time was the last execution time, - scheduled_time = self.next_scheduled_run_time() - - elif scheduled_time > last_scheduled_time: - # Matched cron time was after last execution time, but before now. - # Use this cron time - pass - - else: - # The last execution time is after the most recent matching cron time. - # Next scheduled run will be in the future - scheduled_time = self.next_scheduled_run_time() - - if scheduled_time is None: - return None - - if restriction.latest is not None and scheduled_time > restriction.latest: - # Over the DAG's scheduled end; don't schedule. - return None - - start = self.data_period_start(scheduled_time) - return DagRunInfo( - run_after=scheduled_time, - data_interval=DataInterval(start=start, end=scheduled_time), - ) - - def data_period_start(self, period_end: DateTime): - return period_end - Duration(**{self.period_unit: self.period_length}) - - def croniter_values(self, base_datetime=None): - if not base_datetime: - tz = timezone(self.timezone) - base_datetime = pendulum.now(tz) - - return [croniter(expr, base_datetime) for expr in self.cron_defs] - - def next_scheduled_run_time(self, base_datetime: DateTime = None): - min_date = None - tz = timezone(self.timezone) - - base_datetime_localized = ( - base_datetime.in_timezone(tz) if base_datetime else pendulum.now(tz) - ) - - for cron in self.croniter_values(base_datetime_localized): - next_date = cron.get_next(DateTime) - - min_date = next_date if not min_date else min(min_date, next_date) - - if min_date is None: - return None - return pendulum_instance(min_date) - - def previous_scheduled_run_time(self, base_datetime: DateTime = None): - """ - Get the most recent time in the past that matches one of the cron schedules - """ - max_date = None - tz = timezone(self.timezone) - - base_datetime_localized = ( - base_datetime.in_timezone(tz) if base_datetime else pendulum.now(tz) - ) - - for cron in self.croniter_values(base_datetime_localized): - prev_date = cron.get_prev(DateTime) - - max_date = prev_date if not max_date else max(max_date, prev_date) - - if max_date is None: - return None - return pendulum_instance(max_date) - - def validate(self) -> None: - if not self.cron_defs: - raise AirflowTimetableInvalid( - "At least one cron definition must be present" - ) - - if self.period_unit not in self.valid_units: - raise AirflowTimetableInvalid( - f"period_unit must be one of {self.valid_units}" - ) - - if self.period_length < 0: - raise AirflowTimetableInvalid("period_length must not be less than zero") - - try: - self.croniter_values() - except Exception as e: - raise AirflowTimetableInvalid(str(e)) from None - - @property - def summary(self) -> str: - """A short summary for the timetable. - - This is used to display the timetable in the web UI. A cron expression - timetable, for example, can use this to display the expression. - """ - return " || ".join(self.cron_defs) + f" [TZ: {self.timezone}]" - - def serialize(self) -> Dict[str, Any]: - """Serialize the timetable for JSON encoding. - - This is called during DAG serialization to store timetable information - in the database. This should return a JSON-serializable dict that will - be fed into ``deserialize`` when the DAG is deserialized. - """ - return dict( - cron_defs=self.cron_defs, - timezone=self.timezone, - period_length=self.period_length, - period_unit=self.period_unit, - ) - - @classmethod - def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable": - """Deserialize a timetable from data. - - This is called when a serialized DAG is deserialized. ``data`` will be - whatever was returned by ``serialize`` during DAG serialization. - """ - return cls(**data) - - -class CustomTimetablePlugin(AirflowPlugin): - name = "multi_cron_timetable" - timetables = [MultiCronTimetable] -''' class OrbiterMultiCronTimetable(OrbiterTimetable): @@ -237,12 +34,23 @@ class OrbiterMultiCronTimetable(OrbiterTimetable): imports: ImportList = [ OrbiterRequirement( package="croniter", - module="plugins.multi_cron_timetable", + module="include.multi_cron_timetable", names=["MultiCronTimetable"], ) ] orbiter_includes: Set["OrbiterInclude"] = { - OrbiterInclude(filepath="plugins/multi_cron_timetable.py", contents=TIMETABLE_CONTENTS) + OrbiterInclude( + filepath="include/multi_cron_timetable.py", + contents=Path( + find_spec("orbiter.assets.timetables.multi_cron.multi_cron_timetable_src").origin + ).read_text(), + ), + OrbiterInclude( + filepath="plugins/multi_cron_timetable.py", + contents=Path( + find_spec("orbiter.assets.timetables.multi_cron.multi_cron_timetable_plugin_src").origin + ).read_text(), + ), } render_attributes: RenderAttributes = [ "cron_defs", diff --git a/pyproject.toml b/pyproject.toml index 5c2e155..b334572 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -148,7 +148,7 @@ docstring-code-line-length = 80 [tool.pytest.ini_options] pythonpath = ["."] norecursedirs = [ - "*output*", "*dags/", "*dags/*", "*override*", '*artifacts*', + "*output*", "*dags/", "*dags/*", "*override*", '*artifacts*', "*assets*", "venv", "*.egg", ".eggs", "dist", "build", "docs", ".tox", ".git", "__pycache__" ] testpaths = ["orbiter", "tests"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration_test.py b/tests/integration_test.py index 6655af4..4e0b474 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -8,5 +8,6 @@ def test_integration(): output = run("just docker-run-binary", shell=True, capture_output=True, text=True) assert "Available Origins" in output.stdout - assert "Adding local .pyz files ['/data/orbiter_community_translations.pyz'] to sys.path" in output.stdout - assert "Finding files with extension=['.xml'] in /data/workflow" in output.stdout + assert "Adding local .pyz files ['/data/orbiter_translations.pyz.pyz'] to sys.path" in output.stdout + assert "Translating [File 0]=/data/workflow/demo.xml" in output.stdout + assert "Writing /data/output/dags" in output.stdout diff --git a/tests/orbiter/objects/project_test.py b/tests/orbiter/objects/project_test.py index 0225c13..7c8a22f 100644 --- a/tests/orbiter/objects/project_test.py +++ b/tests/orbiter/objects/project_test.py @@ -9,6 +9,7 @@ from orbiter.objects.pool import OrbiterPool from orbiter.objects.project import OrbiterProject from orbiter.objects.task import OrbiterTask +from orbiter.objects.timetables.multi_cron_timetable import OrbiterMultiCronTimetable from orbiter.objects.variable import OrbiterVariable @@ -17,7 +18,12 @@ def test_project_render(tmpdir): # noinspection PyArgumentList project = OrbiterProject().add_dags( dags=[ - OrbiterDAG(dag_id="foo", file_path="foo.py", schedule=None, doc_md="foo").add_tasks( + OrbiterDAG( + dag_id="foo", + file_path="foo.py", + schedule=OrbiterMultiCronTimetable(cron_defs=["0 1 * * *", "*/5 0 * * *"]), + doc_md="foo", + ).add_tasks( tasks=[ OrbiterTask( task_id="foo", @@ -46,7 +52,7 @@ def test_project_render(tmpdir): project.render(tmpdir) actual_requirements = (tmpdir / "requirements.txt").read_text() - expected_requirements = "apache-airflow\npendulum" + expected_requirements = "apache-airflow\ncroniter\npendulum" assert actual_requirements == expected_requirements actual_packages = (tmpdir / "packages.txt").read_text() @@ -77,7 +83,18 @@ def test_project_render(tmpdir): actual_dag = actual_dag.read_text() expected_dag = """from airflow import DAG from airflow.operators.empty import EmptyOperator +from include.multi_cron_timetable import MultiCronTimetable from pendulum import DateTime, Timezone -with DAG(dag_id='foo', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md='foo'): +with DAG(dag_id='foo', schedule=MultiCronTimetable(cron_defs=['0 1 * * *', '*/5 0 * * *']), start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md='foo'): foo_task = EmptyOperator(task_id='foo', doc='some other thing')""" assert actual_dag == expected_dag + + actual_include = tmpdir / "include/multi_cron_timetable.py" + assert actual_include.exists(), actual_include + actual_include = actual_include.read_text() + assert "class MultiCronTimetable(Timetable):" in actual_include + + actual_plugin = tmpdir / "plugins/multi_cron_timetable.py" + assert actual_plugin.exists(), actual_plugin + actual_plugin = actual_plugin.read_text() + assert "class MultiCronTimetablePlugin(AirflowPlugin)" in actual_plugin