From 6d0b7b70c948d06e7037381a6e7808c5f08ddcc3 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 18 Sep 2023 20:06:59 +0300 Subject: [PATCH] Update fetch_notices_by_date.py --- dags/fetch_notices_by_date.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index f872dc8d..61879425 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -1,6 +1,7 @@ -from datetime import timedelta +from datetime import date, datetime from airflow.decorators import dag, task +from airflow.models import Param from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule @@ -29,7 +30,25 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'), - tags=['selector', 'daily-fetch']) + tags=['selector', 'daily-fetch'], + params={ + WILD_CARD_DAG_KEY: Param( + default=f"{date.today()}", + type="string", + format="date", + title="Date", + description="""This field is required. + Date to fetch notices from TED.""" + ), + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param( + default=True, + type="boolean", + title="Trigger Complete Workflow", + description="""This field is required. + If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered.""" + ) + } + ) def fetch_notices_by_date(): @task @event_log(TechnicalEventMessage( @@ -39,7 +58,9 @@ def fetch_notices_by_date(): )) ) def fetch_by_date_notice_from_ted(): - notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY)) + selected_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY, raise_error=True), "%Y-%m-%d") + date_wild_card = datetime.strftime(selected_date, "%Y%m%d*") + notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=date_wild_card) if not notice_ids: log_error("No notices has been fetched!") else: @@ -62,9 +83,7 @@ def validate_fetched_notices(): from datetime import datetime from pymongo import MongoClient - publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY, - default_value=(datetime.now() - timedelta(days=1)).strftime( - "%Y%m%d*")), "%Y%m%d*") + publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY), "%Y-%m-%d") mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client)