diff --git a/dags/daily_check_notices_availability_in_cellar.py b/dags/daily_check_notices_availability_in_cellar.py index 03a4c6ea..cd88777a 100644 --- a/dags/daily_check_notices_availability_in_cellar.py +++ b/dags/daily_check_notices_availability_in_cellar.py @@ -1,3 +1,8 @@ +""" +This DAG is responsible for checking the availability of notices in the cellar. +""" + + from airflow.decorators import dag, task from pymongo import MongoClient @@ -11,6 +16,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, catchup=False, schedule_interval="0 0 * * *", tags=['daily', 'validation']) diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 2618f0f0..1cc9698c 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for updating the materialized views of the notice and batch collections. +""" + from airflow.decorators import dag, task from pymongo import MongoClient @@ -13,6 +17,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, schedule_interval="0 6 * * *", tags=['mongodb', 'daily-views-update']) def daily_materialized_views_update(): diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 792457e4..8d4d550f 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -1,3 +1,7 @@ +""" +This DAG is used to fetch notices from TED by date. +""" + from datetime import date, datetime from airflow.decorators import dag, task @@ -29,6 +33,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'), tags=['selector', 'daily-fetch'], params={ diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index 3d2e7ebf..1ddb740a 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for fetching notices from TED by date range. +""" + from datetime import datetime, date from typing import Any @@ -36,6 +40,8 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, params={ START_DATE_KEY: Param( default=f"{date.today()}", diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py index bb8c67b1..0ed6fc62 100644 --- a/dags/fetch_notices_by_query.py +++ b/dags/fetch_notices_by_query.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for fetching notices from TED by query. +""" + from datetime import date from airflow.decorators import dag, task @@ -26,6 +30,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['fetch'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, params={ QUERY_DAG_KEY: Param( default=f"PD=[>={date.today().strftime('%Y%m%d')} AND <={date.today().strftime('%Y%m%d')}]", diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index b4f8f7d0..c4dc27b9 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for loading a mapping suite package into the database. +""" + from airflow.decorators import dag, task from airflow.models import Param from airflow.operators.empty import EmptyOperator @@ -32,6 +36,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['fetch', 'mapping-suite', 'github'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, params={ GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param( default=None, diff --git a/dags/load_notices_in_fuseki.py b/dags/load_notices_in_fuseki.py index c3abadab..f1863a56 100644 --- a/dags/load_notices_in_fuseki.py +++ b/dags/load_notices_in_fuseki.py @@ -1,3 +1,7 @@ +""" +This DAG is used to load notices in Fuseki. +""" + from airflow.decorators import dag, task from airflow.models import Param from pymongo import MongoClient @@ -19,6 +23,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['load', 'notices', 'fuseki'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, params={ FUSEKI_DATASET_NAME_DAG_PARAM_KEY: Param( default=DEFAULT_FUSEKI_DATASET_NAME, diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index a92b72af..3e424bc4 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -1,3 +1,7 @@ +""" +This DAG is used to process notices in batch mode. +""" + from typing import List from airflow.operators.python import BranchPythonOperator, PythonOperator @@ -47,6 +51,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, max_active_runs=AIRFLOW_NUMBER_OF_WORKERS, max_active_tasks=1, tags=['worker', 'pipeline']) diff --git a/dags/reprocess_published_in_cellar_notices.py b/dags/reprocess_published_in_cellar_notices.py index 605f7313..1e37248d 100644 --- a/dags/reprocess_published_in_cellar_notices.py +++ b/dags/reprocess_published_in_cellar_notices.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for re-transforming notices that are in the cellar and are publicly available. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -20,6 +24,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 're-transform-publicly-available'], params=REPROCESS_DAG_PARAMS ) diff --git a/dags/reprocess_unnormalised_notices_from_backlog.py b/dags/reprocess_unnormalised_notices_from_backlog.py index 5c7f7a6d..111cbf41 100644 --- a/dags/reprocess_unnormalised_notices_from_backlog.py +++ b/dags/reprocess_unnormalised_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is used to reprocess all notices in RAW status from the backlog. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param @@ -16,6 +20,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 'raw-notices'], params=REPROCESS_DATE_RANGE_DAG_PARAMS ) diff --git a/dags/reprocess_unpackaged_notices_from_backlog.py b/dags/reprocess_unpackaged_notices_from_backlog.py index f4b91b8c..bef3e849 100644 --- a/dags/reprocess_unpackaged_notices_from_backlog.py +++ b/dags/reprocess_unpackaged_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for re-processing notices that are not packaged. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -22,6 +26,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 're-package'], params=REPROCESS_DAG_PARAMS ) diff --git a/dags/reprocess_unpublished_notices_from_backlog.py b/dags/reprocess_unpublished_notices_from_backlog.py index 3fe77fd3..ad9f567e 100644 --- a/dags/reprocess_unpublished_notices_from_backlog.py +++ b/dags/reprocess_unpublished_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is used to re-publish notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -22,6 +26,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 're-publish'], params=REPROCESS_DAG_PARAMS ) diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py index 2bed3628..d65503ab 100644 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ b/dags/reprocess_untransformed_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is used to re-transform notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -24,6 +28,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 're-transform'], params=REPROCESS_DAG_PARAMS ) diff --git a/dags/reprocess_unvalidated_notices_from_backlog.py b/dags/reprocess_unvalidated_notices_from_backlog.py index 39d398a8..827b6410 100644 --- a/dags/reprocess_unvalidated_notices_from_backlog.py +++ b/dags/reprocess_unvalidated_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is used to re-validate notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -20,6 +24,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, tags=['selector', 're-validate'], params=REPROCESS_DAG_PARAMS ) diff --git a/dags/test_dag_a.py b/dags/test_dag_a.py deleted file mode 100644 index ff7dc0ab..00000000 --- a/dags/test_dag_a.py +++ /dev/null @@ -1,55 +0,0 @@ -import datetime - -from airflow.decorators import dag, task -from airflow.models.param import Param - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import get_dag_param - -DAG_NAME = "test_dag_a" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['test', 'new-ui'], - params={ - FORM_NUMBER_DAG_PARAM: Param( - default=None, - type=["null", "string"], - title="Form number", - description="""Form number of the notice"""), - START_DATE_DAG_PARAM: Param( - default=None, - type=["null", "string"], - format="date", - title="Date Picker", - description="Please select a date, use the button on the left for a pup-up calendar. " - "See that here are no times!", - ), - END_DATE_DAG_PARAM: Param( - default=f"{datetime.date.today()}", - type=["null", "string"], - format="date", - title="Date Picker", - description="Please select a date, use the button on the left for a pup-up calendar. " - "See that here are no times!", - ), - } - ) -def test_dag_a(): - @task - def print_configs(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - print(f"form_number: {form_number}") - print(f"start_date: {start_date}") - print(f"end_date: {end_date}") - - print_configs() - - -dag = test_dag_a() diff --git a/dags/test_dag_b_trigger.py b/dags/test_dag_b_trigger.py deleted file mode 100644 index 79534179..00000000 --- a/dags/test_dag_b_trigger.py +++ /dev/null @@ -1,59 +0,0 @@ -import datetime -from uuid import uuid4 - -from airflow.decorators import dag, task -from airflow.models.param import Param -from airflow.operators.python import get_current_context -from airflow.operators.trigger_dagrun import TriggerDagRunOperator - -from dags import DEFAULT_DAG_ARGUMENTS - -DAG_NAME = "test_dag_b_trigger" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['test', 'new-ui'], - # params={ - # FORM_NUMBER_DAG_PARAM: Param( - # default=None, - # type=["null", "string"], - # title="Form number", - # description="""Form number of the notice"""), - # START_DATE_DAG_PARAM: Param( - # default=None, - # type=["null", "string"], - # format="date", - # title="Date Picker", - # description="Please select a date, use the button on the left for a pup-up calendar. " - # "See that here are no times!", - # ), - # END_DATE_DAG_PARAM: Param( - # default=f"{datetime.date.today()}", - # type=["null", "string"], - # format="date", - # title="Date Picker", - # description="Please select a date, use the button on the left for a pup-up calendar. " - # "See that here are no times!", - # ), - # } - ) -def test_dag_b(): - @task - def trigger_test_dag_a(): - context = get_current_context() - TriggerDagRunOperator( - task_id=f'trigger_worker_dag_{uuid4().hex}', - trigger_dag_id="test_dag_a", - conf={ - FORM_NUMBER_DAG_PARAM: "test_form_number", - } - ).execute(context=context) - - trigger_test_dag_a() - - -dag = test_dag_b() diff --git a/dags/test_new_trigger_ui.py b/dags/test_new_trigger_ui.py deleted file mode 100644 index 1b525b45..00000000 --- a/dags/test_new_trigger_ui.py +++ /dev/null @@ -1,312 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""DAG demonstrating various options for a trigger form generated by DAG params. - -The DAG attribute `params` is used to define a default dictionary of parameters which are usually passed -to the DAG and which are used to render a trigger form. -""" -#from __future__ import annotations - -import datetime -import json -from pathlib import Path - -from airflow import DAG -from airflow.decorators import task -from airflow.exceptions import AirflowSkipException -from airflow.models.dagrun import DagRun -from airflow.models.param import Param -from airflow.models.taskinstance import TaskInstance - -with DAG( - dag_id=Path(__file__).stem, - description=__doc__[0: __doc__.find(".")], - doc_md=__doc__, - schedule=None, - start_date=datetime.datetime(2022, 3, 4), - catchup=False, - tags=["example_ui"], - params={ - # Let's start simple: Standard dict values are detected from type and offered as entry form fields. - # Detected types are numbers, text, boolean, lists and dicts. - # Note that such auto-detected parameters are treated as optional (not required to contain a value) - "x": 3, - "text": "Hello World!", - "flag": False, - "a_simple_list": ["one", "two", "three", "actually one value is made per line"], - # But of course you might want to have it nicer! Let's add some description to parameters. - # Note if you can add any HTML formatting to the description, you need to use the description_html - # attribute. - "most_loved_number": Param( - 42, - type="integer", - title="Your favorite number", - description_html="""Everybody should have a favorite number. Not only math teachers. - If you can not think of any at the moment please think of the 42 which is very famous because - of the book - - The Hitchhiker's Guide to the Galaxy""", - ), - # If you want to have a selection list box then you can use the enum feature of JSON schema - "pick_one": Param( - "value 42", - type="string", - title="Select one Value", - description="You can use JSON schema enum's to generate drop down selection boxes.", - enum=[f"value {i}" for i in range(16, 64)], - ), - # You can also label the selected values via values_display attribute - "pick_with_label": Param( - 3, - type="number", - title="Select one Number", - description="With drop down selections you can also have nice display labels for the values.", - enum=[*range(1, 10)], - values_display={ - 1: "One", - 2: "Two", - 3: "Three", - 4: "Four - is like you take three and get one for free!", - 5: "Five", - 6: "Six", - 7: "Seven", - 8: "Eight", - 9: "Nine", - }, - ), - # If you want to have a list box with proposals but not enforcing a fixed list - # then you can use the examples feature of JSON schema - "proposals": Param( - "some value", - type="string", - title="Field with proposals", - description="You can use JSON schema examples's to generate drop down selection boxes " - "but allow also to enter custom values. Try typing an 'a' and see options.", - examples=( - "Alpha,Bravo,Charlie,Delta,Echo,Foxtrot,Golf,Hotel,India,Juliett,Kilo,Lima,Mike,November,Oscar,Papa," - "Quebec,Romeo,Sierra,Tango,Uniform,Victor,Whiskey,X-ray,Yankee,Zulu" - ).split(","), - ), - # If you want to select multiple items from a fixed list JSON schema des not allow to use enum - # In this case the type "array" is being used together with "examples" as pick list - "multi_select": Param( - ["two", "three"], - "Select from the list of options.", - type="array", - title="Multi Select", - examples=["one", "two", "three", "four", "five"], - ), - # A multiple options selection can also be combined with values_display - "multi_select_with_label": Param( - ["2", "3"], - "Select from the list of options. See that options can have nicer text and still technical values" - "are propagated as values during trigger to the DAG.", - type="array", - title="Multi Select with Labels", - examples=["1", "2", "3", "4", "5"], - values_display={ - "1": "One box of choccolate", - "2": "Two bananas", - "3": "Three apples", - # Note: Value display mapping does not need to be complete.s - }, - ), - # An array of numbers - "array_of_numbers": Param( - [1, 2, 3], - "Only integers are accepted in this array", - type="array", - title="Array of numbers", - items={"type": "number"}, - ), - # Boolean as proper parameter with description - "bool": Param( - True, - type="boolean", - title="Please confirm", - description="A On/Off selection with a proper description.", - ), - # Dates and Times are also supported - "date_time": Param( - f"{datetime.date.today()}T{datetime.time(hour=12, minute=17, second=00)}+00:00", - type="string", - format="date-time", - title="Date-Time Picker", - description="Please select a date and time, use the button on the left for a pup-up calendar.", - ), - "date": Param( - f"{datetime.date.today()}", - type="string", - format="date", - title="Date Picker", - description="Please select a date, use the button on the left for a pup-up calendar. " - "See that here are no times!", - ), - "time": Param( - f"{datetime.time(hour=12, minute=13, second=14)}", - type=["string", "null"], - format="time", - title="Time Picker", - description="Please select a time, use the button on the left for a pup-up tool.", - ), - # Fields can be required or not. If the defined fields are typed they are getting required by default - # (else they would not pass JSON schema validation) - to make typed fields optional you must - # permit the optional "null" type - "required_field": Param( - "You can not trigger if no text is given here!", - type="string", - title="Required text field", - description="This field is required. You can not submit without having text in here.", - ), - "optional_field": Param( - "optional text, you can trigger also w/o text", - type=["null", "string"], - title="Optional text field", - description_html="This field is optional. As field content is JSON schema validated you must " - "allow the null type.", - ), - # You can arrange the entry fields in sections so that you can have a better overview for the user - # Therefore you can add the "section" attribute. - # The benefit of the Params class definition is that the full scope of JSON schema validation - # can be leveraged for form fields and they will be validated before DAG submission. - "checked_text": Param( - "length-checked-field", - type="string", - title="Text field with length check", - description_html="""This field is required. And you need to provide something between 10 and 30 - characters. See the - - JSON schema description (string) in for more details""", - minLength=10, - maxLength=20, - section="JSON Schema validation options", - ), - "checked_number": Param( - 100, - type="number", - title="Number field with value check", - description_html="""This field is required. You need to provide any number between 64 and 128. - See the - JSON schema description (numbers) in for more details""", - minimum=64, - maximum=128, - section="JSON Schema validation options", - ), - # Some further cool stuff as advanced options are also possible - # You can have the user entering a dict object as a JSON with validation - "object": Param( - {"key": "value"}, - type=["object", "null"], - title="JSON entry field", - section="Special advanced stuff with form fields", - ), - "array_of_objects": Param( - [{"name": "account_name", "country": "country_name"}], - "Array with complex objects and validation rules. " - "See JSON Schema validation options in specs.", - type="array", - title="JSON array field", - items={ - "type": "object", - "properties": {"name": {"type": "string"}, "country_name": {"type": "string"}}, - "required": ["name"], - }, - section="Special advanced stuff with form fields", - ), - # If you want to have static parameters which are always passed and not editable by the user - # then you can use the JSON schema option of passing constant values. These parameters - # will not be displayed but passed to the DAG - "hidden_secret_field": Param("constant value", const="constant value"), - # Finally besides the standard provided field generator you can have you own HTML form code - # injected - but be careful, you can also mess-up the layout! - "color_picker": Param( - "#FF8800", - type="string", - title="Pick a color", - description_html="""This is a special HTML widget as custom implementation in the DAG code. - It is templated with the following parameter to render proper HTML form fields: - - Example: <input name='{name}' value='{value}' onchange='updateJSONconf()' /> - """, - custom_html_form=""" -
- - - - -
 
- -
- - - -
- - - -
- """, - section="Special advanced stuff with form fields", - ), - }, -) as dag: - @task(task_id="show_params") - def show_params(**kwargs) -> None: - ti: TaskInstance = kwargs["ti"] - dag_run: DagRun = ti.dag_run - if not dag_run.conf: - print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?") - raise AirflowSkipException("No DagRun.conf parameters supplied.") - print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}") - - - show_params()