From 6337ac971704eafc1411502d2f9fdc8a80cb287e Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Fri, 23 Feb 2024 20:45:08 +0200 Subject: [PATCH] add DAGs for eForms fetching and indexing --- dags/eforms_notices/__init__.py | 0 .../fetch_eform_notices_by_date.py | 69 +++++++++++++++++++ .../fetch_eform_notices_by_date_range.py | 59 ++++++++++++++++ 3 files changed, 128 insertions(+) create mode 100644 dags/eforms_notices/__init__.py create mode 100644 dags/eforms_notices/fetch_eform_notices_by_date.py create mode 100644 dags/eforms_notices/fetch_eform_notices_by_date_range.py diff --git a/dags/eforms_notices/__init__.py b/dags/eforms_notices/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dags/eforms_notices/fetch_eform_notices_by_date.py b/dags/eforms_notices/fetch_eform_notices_by_date.py new file mode 100644 index 000000000..1c7edab76 --- /dev/null +++ b/dags/eforms_notices/fetch_eform_notices_by_date.py @@ -0,0 +1,69 @@ +from airflow.decorators import dag, task +from pymongo import MongoClient + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY +from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_query_pipeline +from ted_sws import config +from ted_sws.data_manager.adapters.notice_repository import NoticeRepository +from ted_sws.data_sampler.services.notice_xml_indexer import index_eforms_notice +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ + EventMessageProcessType +from ted_sws.event_manager.services.log import log_error + +DAG_NAME = "fetch_notices_by_date" +BATCH_SIZE = 2000 +WILD_CARD_DAG_KEY = "wild_card" +TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" +TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow" +TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow" +CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "check_if_trigger_complete_workflow" +FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_date" +VALIDATE_FETCHED_NOTICES_TASK_ID = "validate_fetched_notices" + + +@dag(default_args=DEFAULT_DAG_ARGUMENTS, + catchup=False, tags=['selector', 'daily-fetch']) +def fetch_and_index_eforms_notices_by_date(): + @task + @event_log(TechnicalEventMessage( + message="fetch_eforms_notice_from_ted", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + )) + ) + def fetch_by_date_notice_from_ted(): + + date_wildcard = get_dag_param(key=WILD_CARD_DAG_KEY) + query = f"""TD NOT IN (C E G I D P M Q O R 0 1 2 3 4 5 6 7 8 9 B S Y V F A H J K) AND + notice-subtype IN (10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24) AND FT~"eforms-sdk-" AND + PD={date_wildcard} + """ + notice_ids = notice_fetcher_by_query_pipeline(query=query) + if not notice_ids: + log_error("No notices has been fetched!") + else: + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + @task + @event_log(TechnicalEventMessage( + message="index_eforms_notices_by_date", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + )) + ) + def index_eforms_notices_by_date(): + notice_ids = pull_dag_upstream(key=NOTICE_IDS_KEY) + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_repository = NoticeRepository(mongodb_client) + for notice_id in notice_ids: + notice = notice_repository.get(notice_id) + indexed_notice = index_eforms_notice(notice) + notice_repository.update(indexed_notice) + + fetch_by_date_notice_from_ted() >> index_eforms_notices_by_date() + + +dag = fetch_and_index_eforms_notices_by_date() diff --git a/dags/eforms_notices/fetch_eform_notices_by_date_range.py b/dags/eforms_notices/fetch_eform_notices_by_date_range.py new file mode 100644 index 000000000..63e612df2 --- /dev/null +++ b/dags/eforms_notices/fetch_eform_notices_by_date_range.py @@ -0,0 +1,59 @@ +from datetime import datetime +from typing import Any +from dateutil import rrule + +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import get_dag_param +from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ + EventMessageProcessType + +DAG_NAME = "fetch_notices_by_date_range" + +START_DATE_KEY = "start_date" +END_DATE_KEY = "end_date" + + +def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> list: + """ + Given a date range returns all daily dates in that range + :param start_date: + :param end_date: + :return: + """ + return [dt.strftime('%Y%m%d*') + for dt in rrule.rrule(rrule.DAILY, + dtstart=datetime.strptime(start_date, '%Y%m%d'), + until=datetime.strptime(end_date, '%Y%m%d'))] + + +@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master']) +def fetch_eforms_notices_by_date_range(): + @task + @event_log(TechnicalEventMessage( + message="trigger_fetch_notices_workers_for_date_range", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + )) + ) + def trigger_notice_by_date_for_each_date_in_range(): + context: Any = get_current_context() + start_date = get_dag_param(key=START_DATE_KEY, raise_error=True) + end_date = get_dag_param(key=END_DATE_KEY, raise_error=True) + date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date) + for date_wildcard in date_wildcards: + TriggerDagRunOperator( + task_id=f'trigger_eforms_notice_fetch_by_date_workflow_dag_{date_wildcard[:-1]}', + trigger_dag_id="fetch_and_index_eforms_notices_by_date", + conf={WILD_CARD_DAG_KEY: date_wildcard} + ).execute(context=context) + + trigger_notice_by_date_for_each_date_in_range() + + +dag = fetch_eforms_notices_by_date_range()