diff --git a/dags/daily_check_notices_availability_in_cellar.py b/dags/daily_check_notices_availability_in_cellar.py index 03a4c6ea..cd88777a 100644 --- a/dags/daily_check_notices_availability_in_cellar.py +++ b/dags/daily_check_notices_availability_in_cellar.py @@ -1,3 +1,8 @@ +""" +This DAG is responsible for checking the availability of notices in the cellar. +""" + + from airflow.decorators import dag, task from pymongo import MongoClient @@ -11,6 +16,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, catchup=False, schedule_interval="0 0 * * *", tags=['daily', 'validation']) diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 2618f0f0..1cc9698c 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for updating the materialized views of the notice and batch collections. +""" + from airflow.decorators import dag, task from pymongo import MongoClient @@ -13,6 +17,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, schedule_interval="0 6 * * *", tags=['mongodb', 'daily-views-update']) def daily_materialized_views_update(): diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index f872dc8d..8d4d550f 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -1,7 +1,12 @@ -from datetime import timedelta +""" +This DAG is used to fetch notices from TED by date. +""" + +from datetime import date, datetime from airflow.decorators import dag, task -from airflow.operators.dummy import DummyOperator +from airflow.models import Param +from airflow.operators.empty import EmptyOperator from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.timetables.trigger import CronTriggerTimetable @@ -28,8 +33,28 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'), - tags=['selector', 'daily-fetch']) + tags=['selector', 'daily-fetch'], + params={ + WILD_CARD_DAG_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="Date", + description="""This field is required. + Date to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=True, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_date(): @task @event_log(TechnicalEventMessage( @@ -39,7 +64,9 @@ def fetch_notices_by_date(): )) ) def fetch_by_date_notice_from_ted(): - notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY)) + selected_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY, raise_error=True), "%Y-%m-%d") + date_wild_card = datetime.strftime(selected_date, "%Y%m%d*") + notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=date_wild_card) if not notice_ids: log_error("No notices has been fetched!") else: @@ -62,9 +89,7 @@ def validate_fetched_notices(): from datetime import datetime from pymongo import MongoClient - publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY, - default_value=(datetime.now() - timedelta(days=1)).strftime( - "%Y%m%d*")), "%Y%m%d*") + publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY), "%Y-%m-%d") mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client) @@ -91,7 +116,7 @@ def _branch_selector(): python_callable=validate_fetched_notices ) - finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, + finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) fetch_by_date_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow, diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index 4c4f0e6c..1ddb740a 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -1,5 +1,11 @@ -from datetime import datetime +""" +This DAG is responsible for fetching notices from TED by date range. +""" + +from datetime import datetime, date from typing import Any + +from airflow.models import Param from dateutil import rrule from airflow.decorators import dag, task @@ -29,11 +35,39 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l """ return [dt.strftime('%Y%m%d*') for dt in rrule.rrule(rrule.DAILY, - dtstart=datetime.strptime(start_date, '%Y%m%d'), - until=datetime.strptime(end_date, '%Y%m%d'))] + dtstart=datetime.strptime(start_date, '%Y-%m-%d'), + until=datetime.strptime(end_date, '%Y-%m-%d'))] -@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master']) +@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + params={ + START_DATE_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="Start Date", + description="""This field is required. + Start date of the date range to fetch notices from TED.""" + ), + END_DATE_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="End Date", + description="""This field is required. + End date of the date range to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=False, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_date_range(): @task @event_log(TechnicalEventMessage( @@ -46,7 +80,8 @@ def trigger_notice_by_date_for_each_date_in_range(): context: Any = get_current_context() start_date = get_dag_param(key=START_DATE_KEY, raise_error=True) end_date = get_dag_param(key=END_DATE_KEY, raise_error=True) - trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False) + trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, + default_value=False) date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date) for date_wildcard in date_wildcards: TriggerDagRunOperator( diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py index 1c019acf..0ed6fc62 100644 --- a/dags/fetch_notices_by_query.py +++ b/dags/fetch_notices_by_query.py @@ -1,5 +1,12 @@ +""" +This DAG is responsible for fetching notices from TED by query. +""" + +from datetime import date + from airflow.decorators import dag, task -from airflow.operators.dummy import DummyOperator +from airflow.models import Param +from airflow.operators.empty import EmptyOperator from airflow.operators.python import BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule from dags import DEFAULT_DAG_ARGUMENTS @@ -22,7 +29,26 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['fetch']) + tags=['fetch'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + params={ + QUERY_DAG_KEY: Param( + default=f"PD=[>={date.today().strftime('%Y%m%d')} AND <={date.today().strftime('%Y%m%d')}]", + type="string", + title="Query", + description="""This field is required. + Query to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=True, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_query(): @task @event_log(TechnicalEventMessage( @@ -58,7 +84,7 @@ def _branch_selector(): python_callable=_branch_selector, ) - finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, + finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow, diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index 944aa38c..c4dc27b9 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -1,5 +1,10 @@ +""" +This DAG is responsible for loading a mapping suite package into the database. +""" + from airflow.decorators import dag, task -from airflow.operators.dummy import DummyOperator +from airflow.models import Param +from airflow.operators.empty import EmptyOperator from airflow.operators.python import get_current_context, BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule from pymongo import MongoClient @@ -30,7 +35,39 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['fetch', 'mapping-suite', 'github']) + tags=['fetch', 'mapping-suite', 'github'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + params={ + GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Github repository url", + description="""This is optional field. + Github repository url to fetch mapping suite package from.""" + ), + BRANCH_OR_TAG_NAME_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Branch or tag name", + description="""This is optional field. + Branch or tag name to fetch mapping suite package from.""" + ), + MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Mapping suite package name", + description="""This is optional field. + Mapping suite package name to fetch from github repository.""" + ), + LOAD_TEST_DATA_DAG_PARAM_KEY: Param( + default=False, + type="boolean", + title="Load test data", + description="""This field is used to load test data.""" + ) + } + ) def load_mapping_suite_in_database(): @task @event_log(is_loggable=False) @@ -77,7 +114,7 @@ def _branch_selector(): task_id=CHECK_IF_LOAD_TEST_DATA_TASK_ID, python_callable=_branch_selector, ) - finish_step = DummyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, + finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID) diff --git a/dags/load_notices_in_fuseki.py b/dags/load_notices_in_fuseki.py index 0209df86..f1863a56 100644 --- a/dags/load_notices_in_fuseki.py +++ b/dags/load_notices_in_fuseki.py @@ -1,4 +1,9 @@ +""" +This DAG is used to load notices in Fuseki. +""" + from airflow.decorators import dag, task +from airflow.models import Param from pymongo import MongoClient from dags import DEFAULT_DAG_ARGUMENTS @@ -17,7 +22,24 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['load', 'notices', 'fuseki']) + tags=['load', 'notices', 'fuseki'], + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + params={ + FUSEKI_DATASET_NAME_DAG_PARAM_KEY: Param( + default=DEFAULT_FUSEKI_DATASET_NAME, + type="string", + title="Fuseki dataset name", + description="This field is used to specify the name of the dataset in Fuseki." + ), + NOTICE_STATUS_DAG_PARAM_KEY: Param( + default=str(NoticeStatus.PUBLISHED), + type="string", + title="Notice status", + description="This field is used to filter notices by status." + ) + } + ) def load_notices_in_fuseki(): @task def load_distilled_rdf_manifestations_in_fuseki(): diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index db365590..3e424bc4 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -1,3 +1,7 @@ +""" +This DAG is used to process notices in batch mode. +""" + from typing import List from airflow.operators.python import BranchPythonOperator, PythonOperator @@ -47,6 +51,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, max_active_runs=AIRFLOW_NUMBER_OF_WORKERS, max_active_tasks=1, tags=['worker', 'pipeline']) @@ -78,7 +84,7 @@ def _selector_branch_before_publish(): def _stop_processing(): notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) if not notice_ids: - raise Exception(f"No notice has been processed!") + raise Exception("No notice has been processed!") start_processing = BranchPythonOperator( task_id=BRANCH_SELECTOR_TASK_ID, diff --git a/dags/reprocess_dag_params.py b/dags/reprocess_dag_params.py new file mode 100644 index 00000000..40bbcb1d --- /dev/null +++ b/dags/reprocess_dag_params.py @@ -0,0 +1,43 @@ +from airflow.models import Param + +FORM_NUMBER_DAG_PARAM = "form_number" +START_DATE_DAG_PARAM = "start_date" +END_DATE_DAG_PARAM = "end_date" +XSD_VERSION_DAG_PARAM = "xsd_version" + +REPROCESS_DATE_RANGE_DAG_PARAMS = { + START_DATE_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="date", + title="Start Date", + description="""This field is optional. If you want to filter notices by start date, please insert a date. + This field needs to be used with the end date field. + """ + ), + END_DATE_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="date", + title="End Date", + description="""This field is optional. If you want to filter notices by end date, please insert a date. + This field needs to be used with the start date field. + """, + ) +} + +REPROCESS_DAG_PARAMS = { + FORM_NUMBER_DAG_PARAM: Param( + default=None, + type=["null", "string"], + title="Form Number", + description="This field is optional. If you want to filter notices by form number, please insert a form number."), + **REPROCESS_DATE_RANGE_DAG_PARAMS, + XSD_VERSION_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="string", + title="XSD Version", + description="This field is optional. If you want to filter notices by XSD version, please insert a XSD version." + ) +} diff --git a/dags/reprocess_published_in_cellar_notices.py b/dags/reprocess_published_in_cellar_notices.py index 37a7e59d..1e37248d 100644 --- a/dags/reprocess_published_in_cellar_notices.py +++ b/dags/reprocess_published_in_cellar_notices.py @@ -1,11 +1,16 @@ +""" +This DAG is responsible for re-transforming notices that are in the cellar and are publicly available. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -15,15 +20,15 @@ RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-transform-publicly-available']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 're-transform-publicly-available'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_published_in_cellar_notices(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unnormalised_notices_from_backlog.py b/dags/reprocess_unnormalised_notices_from_backlog.py index 51005e1e..111cbf41 100644 --- a/dags/reprocess_unnormalised_notices_from_backlog.py +++ b/dags/reprocess_unnormalised_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is used to reprocess all notices in RAW status from the backlog. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param @@ -7,19 +11,20 @@ from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType +from dags.reprocess_dag_params import START_DATE_DAG_PARAM, END_DATE_DAG_PARAM, REPROCESS_DATE_RANGE_DAG_PARAMS DAG_NAME = "reprocess_unnormalised_notices_from_backlog" TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 'raw-notices']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 'raw-notices'], + params=REPROCESS_DATE_RANGE_DAG_PARAMS + ) def reprocess_unnormalised_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unpackaged_notices_from_backlog.py b/dags/reprocess_unpackaged_notices_from_backlog.py index 75d37f4c..bef3e849 100644 --- a/dags/reprocess_unpackaged_notices_from_backlog.py +++ b/dags/reprocess_unpackaged_notices_from_backlog.py @@ -1,3 +1,7 @@ +""" +This DAG is responsible for re-processing notices that are not packaged. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS @@ -9,6 +13,8 @@ from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM DAG_NAME = "reprocess_unpackaged_notices_from_backlog" @@ -16,15 +22,15 @@ NoticeStatus.ELIGIBLE_FOR_PACKAGING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-package']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 're-package'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unpackaged_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unpublished_notices_from_backlog.py b/dags/reprocess_unpublished_notices_from_backlog.py index fa000143..ad9f567e 100644 --- a/dags/reprocess_unpublished_notices_from_backlog.py +++ b/dags/reprocess_unpublished_notices_from_backlog.py @@ -1,13 +1,18 @@ +""" +This DAG is used to re-publish notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType @@ -17,15 +22,15 @@ NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE ] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-publish']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 're-publish'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unpublished_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py index b93033e6..d65503ab 100644 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ b/dags/reprocess_untransformed_notices_from_backlog.py @@ -1,11 +1,16 @@ +""" +This DAG is used to re-transform notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -14,19 +19,20 @@ DAG_NAME = "reprocess_untransformed_notices_from_backlog" RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, - NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, + NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, + NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED ] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-transform']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 're-transform'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_untransformed_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unvalidated_notices_from_backlog.py b/dags/reprocess_unvalidated_notices_from_backlog.py index 9f53f3f5..827b6410 100644 --- a/dags/reprocess_unvalidated_notices_from_backlog.py +++ b/dags/reprocess_unvalidated_notices_from_backlog.py @@ -1,11 +1,16 @@ +""" +This DAG is used to re-validate notices. +""" + from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -15,15 +20,15 @@ RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-validate']) + description=__doc__[0: __doc__.find(".")], + doc_md=__doc__, + tags=['selector', 're-validate'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unvalidated_notices_from_backlog(): @task @event_log(TechnicalEventMessage(