Skip to content

Commit

Permalink
make start picking at dag level
Browse files Browse the repository at this point in the history
  • Loading branch information
duprijil committed Oct 6, 2023
1 parent da1977c commit b67146b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
24 changes: 19 additions & 5 deletions dags/daily_notices_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
DAG to update daily notices metadata from TED.
"""

from datetime import date, datetime
from datetime import date, datetime, timedelta

from airflow.decorators import dag, task
from airflow.models import Param
Expand All @@ -16,6 +16,8 @@

START_DATE_PARAM_KEY = "start_date"
END_DATE_PARAM_KEY = "end_date"
DEFAULT_TED_API_START_DATE = "2014-01-01"
DEFAULT_TED_API_START_DATE_FORMAT = "%Y-%m-%d"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
Expand Down Expand Up @@ -50,16 +52,28 @@ def update_daily_notices_metadata_from_ted_api():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=False)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=False)

update_daily_notices_metadata_from_ted(start_date=datetime.strptime(start_date, "%Y-%m-%d"),
end_date=datetime.strptime(end_date, "%Y-%m-%d"))
if start_date and end_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
else:
start_date = datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT)
end_date = datetime.today() - timedelta(days=1)

update_daily_notices_metadata_from_ted(start_date=start_date, end_date=end_date)

@task
def update_daily_notices_metadata_with_fetched_data_from_repo():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=False)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=False)

update_daily_notices_metadata_with_fetched_data(start_date=datetime.strptime(start_date, "%Y-%m-%d"),
end_date=datetime.strptime(end_date, "%Y-%m-%d"))
if start_date and end_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
else:
start_date = datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT)
end_date = datetime.today() - timedelta(days=1)

update_daily_notices_metadata_with_fetched_data(start_date=start_date, end_date=end_date)

update_daily_notices_metadata_from_ted_api() >> update_daily_notices_metadata_with_fetched_data_from_repo()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import date, datetime, timedelta
from datetime import date
from typing import Optional, List

from dateutil import rrule
Expand All @@ -11,8 +11,6 @@
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI

DEFAULT_TED_API_START_DATE = "2014-01-01"
DEFAULT_TED_API_START_DATE_FORMAT = "%Y-%m-%d"
TED_API_NOTICE_ID_FIELD = "ND"
TED_API_WILDCARD_DATE_FORMAT = "%Y%m%d*"
DAILY_NOTICES_METADATA_TED_API_QUERY_RESULT_FIELDS = {"fields": ["ND"]}
Expand Down Expand Up @@ -57,20 +55,18 @@ def generate_list_of_dates_from_date_range(start_date: date, end_date: date) ->
"""
if start_date > end_date:
return None
return [ dt.date() for dt in rrule.rrule(rrule.DAILY,
dtstart=start_date,
until=end_date)]
return [dt.date() for dt in rrule.rrule(rrule.DAILY,
dtstart=start_date,
until=end_date)]


def update_daily_notices_metadata_from_ted(start_date: date = None,
end_date: date = None,
def update_daily_notices_metadata_from_ted(start_date: date,
end_date: date,
ted_api: TedAPIAdapter = None,
daily_notices_metadata_repo: DailyNoticesMetadataRepository = None):
"""
Updates the daily notices metadata from the TED API.
"""
start_date = start_date or datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT)
end_date = end_date or datetime.today() - timedelta(days=1)

if start_date > end_date:
raise Exception("Start date cannot be greater than end date")
Expand All @@ -92,23 +88,21 @@ def update_daily_notices_metadata_from_ted(start_date: date = None,
ted_api_query = DAILY_NOTICES_METADATA_TED_API_QUERY.copy()
ted_api_query[TED_API_QUERY_FIELD] = ted_api_query[TED_API_QUERY_FIELD].format(
aggregation_date=day.strftime(TED_API_WILDCARD_DATE_FORMAT))
notice_ids = ted_api.get_by_query(ted_api_query, result_fields=DAILY_NOTICES_METADATA_TED_API_QUERY_RESULT_FIELDS)
notice_ids = ted_api.get_by_query(ted_api_query,
result_fields=DAILY_NOTICES_METADATA_TED_API_QUERY_RESULT_FIELDS)
daily_notices_metadata = DailyNoticesMetadata(aggregation_date=day)
daily_notices_metadata.ted_api_notice_ids = [notice[TED_API_NOTICE_ID_FIELD] for notice in notice_ids]
daily_notices_metadata_repo.add(daily_notices_metadata)


def update_daily_notices_metadata_with_fetched_data(start_date: date = None,
end_date: date = None,
def update_daily_notices_metadata_with_fetched_data(start_date: date,
end_date: date,
notice_repo: NoticeRepository = None,
daily_notices_metadata_repo: DailyNoticesMetadataRepository = None):
"""
Updates the daily notices metadata witch fetched data.
"""

start_date = start_date or datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT)
end_date = end_date or datetime.today() - timedelta(days=1)

if start_date > end_date:
raise Exception("Start date cannot be greater than end date")

Expand Down

0 comments on commit b67146b

Please sign in to comment.