Skip to content

Commit

Permalink
Update fetch_notices_by_date.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CaptainOfHacks committed Sep 18, 2023
1 parent 61120b5 commit 6d0b7b7
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta
from datetime import date, datetime

from airflow.decorators import dag, task
from airflow.models import Param
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
Expand Down Expand Up @@ -29,7 +30,25 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
tags=['selector', 'daily-fetch'])
tags=['selector', 'daily-fetch'],
params={
WILD_CARD_DAG_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="Date",
description="""This field is required.
Date to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=True,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_date():
@task
@event_log(TechnicalEventMessage(
Expand All @@ -39,7 +58,9 @@ def fetch_notices_by_date():
))
)
def fetch_by_date_notice_from_ted():
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY))
selected_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY, raise_error=True), "%Y-%m-%d")
date_wild_card = datetime.strftime(selected_date, "%Y%m%d*")
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=date_wild_card)
if not notice_ids:
log_error("No notices has been fetched!")
else:
Expand All @@ -62,9 +83,7 @@ def validate_fetched_notices():
from datetime import datetime
from pymongo import MongoClient

publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY,
default_value=(datetime.now() - timedelta(days=1)).strftime(
"%Y%m%d*")), "%Y%m%d*")
publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY), "%Y-%m-%d")
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client)

Expand Down

0 comments on commit 6d0b7b7

Please sign in to comment.