diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index 4c4f0e6c..3d2e7ebf 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -1,5 +1,7 @@ -from datetime import datetime +from datetime import datetime, date from typing import Any + +from airflow.models import Param from dateutil import rrule from airflow.decorators import dag, task @@ -29,11 +31,37 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l """ return [dt.strftime('%Y%m%d*') for dt in rrule.rrule(rrule.DAILY, - dtstart=datetime.strptime(start_date, '%Y%m%d'), - until=datetime.strptime(end_date, '%Y%m%d'))] + dtstart=datetime.strptime(start_date, '%Y-%m-%d'), + until=datetime.strptime(end_date, '%Y-%m-%d'))] -@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master']) +@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'], + params={ + START_DATE_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="Start Date", + description="""This field is required. + Start date of the date range to fetch notices from TED.""" + ), + END_DATE_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="End Date", + description="""This field is required. + End date of the date range to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=False, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_date_range(): @task @event_log(TechnicalEventMessage( @@ -46,7 +74,8 @@ def trigger_notice_by_date_for_each_date_in_range(): context: Any = get_current_context() start_date = get_dag_param(key=START_DATE_KEY, raise_error=True) end_date = get_dag_param(key=END_DATE_KEY, raise_error=True) - trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False) + trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, + default_value=False) date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date) for date_wildcard in date_wildcards: TriggerDagRunOperator( diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py index 1c019acf..8cf2c19f 100644 --- a/dags/fetch_notices_by_query.py +++ b/dags/fetch_notices_by_query.py @@ -1,4 +1,5 @@ from airflow.decorators import dag, task +from airflow.models import Param from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule @@ -22,7 +23,24 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['fetch']) + tags=['fetch'], + params={ + QUERY_DAG_KEY: Param( + default=None, + type="string", + title="Query", + description="""This field is required. + Query to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=True, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_query(): @task @event_log(TechnicalEventMessage( diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index 944aa38c..b4f8f7d0 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -1,5 +1,6 @@ from airflow.decorators import dag, task -from airflow.operators.dummy import DummyOperator +from airflow.models import Param +from airflow.operators.empty import EmptyOperator from airflow.operators.python import get_current_context, BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule from pymongo import MongoClient @@ -30,7 +31,37 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['fetch', 'mapping-suite', 'github']) + tags=['fetch', 'mapping-suite', 'github'], + params={ + GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Github repository url", + description="""This is optional field. + Github repository url to fetch mapping suite package from.""" + ), + BRANCH_OR_TAG_NAME_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Branch or tag name", + description="""This is optional field. + Branch or tag name to fetch mapping suite package from.""" + ), + MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY: Param( + default=None, + type=["null", "string"], + title="Mapping suite package name", + description="""This is optional field. + Mapping suite package name to fetch from github repository.""" + ), + LOAD_TEST_DATA_DAG_PARAM_KEY: Param( + default=False, + type="boolean", + title="Load test data", + description="""This field is used to load test data.""" + ) + } + ) def load_mapping_suite_in_database(): @task @event_log(is_loggable=False) @@ -77,7 +108,7 @@ def _branch_selector(): task_id=CHECK_IF_LOAD_TEST_DATA_TASK_ID, python_callable=_branch_selector, ) - finish_step = DummyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, + finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID) diff --git a/dags/load_notices_in_fuseki.py b/dags/load_notices_in_fuseki.py index 0209df86..c3abadab 100644 --- a/dags/load_notices_in_fuseki.py +++ b/dags/load_notices_in_fuseki.py @@ -1,4 +1,5 @@ from airflow.decorators import dag, task +from airflow.models import Param from pymongo import MongoClient from dags import DEFAULT_DAG_ARGUMENTS @@ -17,7 +18,22 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['load', 'notices', 'fuseki']) + tags=['load', 'notices', 'fuseki'], + params={ + FUSEKI_DATASET_NAME_DAG_PARAM_KEY: Param( + default=DEFAULT_FUSEKI_DATASET_NAME, + type="string", + title="Fuseki dataset name", + description="This field is used to specify the name of the dataset in Fuseki." + ), + NOTICE_STATUS_DAG_PARAM_KEY: Param( + default=str(NoticeStatus.PUBLISHED), + type="string", + title="Notice status", + description="This field is used to filter notices by status." + ) + } + ) def load_notices_in_fuseki(): @task def load_distilled_rdf_manifestations_in_fuseki(): diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index db365590..a92b72af 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -78,7 +78,7 @@ def _selector_branch_before_publish(): def _stop_processing(): notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) if not notice_ids: - raise Exception(f"No notice has been processed!") + raise Exception("No notice has been processed!") start_processing = BranchPythonOperator( task_id=BRANCH_SELECTOR_TASK_ID, diff --git a/dags/reprocess_dag_params.py b/dags/reprocess_dag_params.py new file mode 100644 index 00000000..40bbcb1d --- /dev/null +++ b/dags/reprocess_dag_params.py @@ -0,0 +1,43 @@ +from airflow.models import Param + +FORM_NUMBER_DAG_PARAM = "form_number" +START_DATE_DAG_PARAM = "start_date" +END_DATE_DAG_PARAM = "end_date" +XSD_VERSION_DAG_PARAM = "xsd_version" + +REPROCESS_DATE_RANGE_DAG_PARAMS = { + START_DATE_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="date", + title="Start Date", + description="""This field is optional. If you want to filter notices by start date, please insert a date. + This field needs to be used with the end date field. + """ + ), + END_DATE_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="date", + title="End Date", + description="""This field is optional. If you want to filter notices by end date, please insert a date. + This field needs to be used with the start date field. + """, + ) +} + +REPROCESS_DAG_PARAMS = { + FORM_NUMBER_DAG_PARAM: Param( + default=None, + type=["null", "string"], + title="Form Number", + description="This field is optional. If you want to filter notices by form number, please insert a form number."), + **REPROCESS_DATE_RANGE_DAG_PARAMS, + XSD_VERSION_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="string", + title="XSD Version", + description="This field is optional. If you want to filter notices by XSD version, please insert a XSD version." + ) +} diff --git a/dags/reprocess_published_in_cellar_notices.py b/dags/reprocess_published_in_cellar_notices.py index 37a7e59d..605f7313 100644 --- a/dags/reprocess_published_in_cellar_notices.py +++ b/dags/reprocess_published_in_cellar_notices.py @@ -3,9 +3,10 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -15,15 +16,13 @@ RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-transform-publicly-available']) + tags=['selector', 're-transform-publicly-available'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_published_in_cellar_notices(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unnormalised_notices_from_backlog.py b/dags/reprocess_unnormalised_notices_from_backlog.py index 51005e1e..5c7f7a6d 100644 --- a/dags/reprocess_unnormalised_notices_from_backlog.py +++ b/dags/reprocess_unnormalised_notices_from_backlog.py @@ -7,19 +7,18 @@ from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType +from dags.reprocess_dag_params import START_DATE_DAG_PARAM, END_DATE_DAG_PARAM, REPROCESS_DATE_RANGE_DAG_PARAMS DAG_NAME = "reprocess_unnormalised_notices_from_backlog" TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 'raw-notices']) + tags=['selector', 'raw-notices'], + params=REPROCESS_DATE_RANGE_DAG_PARAMS + ) def reprocess_unnormalised_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unpackaged_notices_from_backlog.py b/dags/reprocess_unpackaged_notices_from_backlog.py index 75d37f4c..f4b91b8c 100644 --- a/dags/reprocess_unpackaged_notices_from_backlog.py +++ b/dags/reprocess_unpackaged_notices_from_backlog.py @@ -9,6 +9,8 @@ from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM DAG_NAME = "reprocess_unpackaged_notices_from_backlog" @@ -16,15 +18,13 @@ NoticeStatus.ELIGIBLE_FOR_PACKAGING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-package']) + tags=['selector', 're-package'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unpackaged_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unpublished_notices_from_backlog.py b/dags/reprocess_unpublished_notices_from_backlog.py index fa000143..3fe77fd3 100644 --- a/dags/reprocess_unpublished_notices_from_backlog.py +++ b/dags/reprocess_unpublished_notices_from_backlog.py @@ -3,11 +3,12 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType @@ -17,15 +18,13 @@ NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE ] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-publish']) + tags=['selector', 're-publish'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unpublished_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py index b93033e6..2bed3628 100644 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ b/dags/reprocess_untransformed_notices_from_backlog.py @@ -3,9 +3,10 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -14,19 +15,18 @@ DAG_NAME = "reprocess_untransformed_notices_from_backlog" RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, - NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, + NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, + NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED ] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-transform']) + tags=['selector', 're-transform'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_untransformed_notices_from_backlog(): @task @event_log(TechnicalEventMessage( diff --git a/dags/reprocess_unvalidated_notices_from_backlog.py b/dags/reprocess_unvalidated_notices_from_backlog.py index 9f53f3f5..39d398a8 100644 --- a/dags/reprocess_unvalidated_notices_from_backlog.py +++ b/dags/reprocess_unvalidated_notices_from_backlog.py @@ -3,9 +3,10 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ + END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ @@ -15,15 +16,13 @@ RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - tags=['selector', 're-validate']) + tags=['selector', 're-validate'], + params=REPROCESS_DAG_PARAMS + ) def reprocess_unvalidated_notices_from_backlog(): @task @event_log(TechnicalEventMessage(