Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
duprijil committed Oct 3, 2023
1 parent 56cbb7f commit b7d4389
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
23 changes: 19 additions & 4 deletions dags/daily_notices_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
DAG to update daily notices metadata from TED.
"""

from datetime import date
from datetime import date, datetime

from airflow.models import Param
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from ted_sws.data_manager.services.daily_notices_metadata_services import update_daily_notices_metadata_from_ted, \
update_daily_notices_metadata_with_fetched_data

START_DATE_PARAM_KEY = "start_date"
END_DATE_PARAM_KEY = "end_date"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['daily', "dashboards", "metadata", "ted", "notices"],
Expand All @@ -39,10 +42,22 @@
)
def daily_notices_metadata_update():
@task
def update_daily_notices_metadata_from_ted():
def update_daily_notices_metadata_from_ted_api():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=True)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=True)

update_daily_notices_metadata_from_ted(start_date=datetime.strptime(start_date, "%Y-%m-%d"),
end_date=datetime.strptime(end_date, "%Y-%m-%d"))

@task
def update_daily_notices_metadata_with_fetched_data_from_repo():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=True)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=True)

update_daily_notices_metadata_from_ted(start_date=start_date, end_date=end_date)
update_daily_notices_metadata_with_fetched_data(start_date=datetime.strptime(start_date, "%Y-%m-%d"),
end_date=datetime.strptime(end_date, "%Y-%m-%d"))

update_daily_notices_metadata_from_ted_api() >> update_daily_notices_metadata_with_fetched_data_from_repo()


update_daily_notices_metadata_from_ted()
dag = daily_notices_metadata_update()
52 changes: 51 additions & 1 deletion ted_sws/data_manager/services/daily_notices_metadata_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from pymongo import MongoClient

from ted_sws import config
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.model.supra_notice import DailyNoticesMetadata
from ted_sws.data_manager.adapters.daily_notices_metadata_repository import DailyNoticesMetadataRepository
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI

DEFAULT_TED_API_START_DATE = "2023-09-29" # TODO: Change to 2014-01-01
Expand Down Expand Up @@ -57,7 +59,7 @@ def update_daily_notices_metadata_from_ted(start_date: date = None,

# Getting from metadata repository dates that are not in the repository from date range
dates_not_in_repository = [day for day in date_range if
not daily_notices_metadata_repo.list_daily_notices_metadata_aggregation_date()]
day not in daily_notices_metadata_repo.list_daily_notices_metadata_aggregation_date()]

# Getting from TED API dates that are not in the repository from date range
# TODO: If in ted are 0 notices, coverage is 1 to all
Expand All @@ -70,3 +72,51 @@ def update_daily_notices_metadata_from_ted(start_date: date = None,
daily_notices_metadata = DailyNoticesMetadata(aggregation_date=day)
daily_notices_metadata.ted_api_notice_ids = [notice[TED_API_NOTICE_ID_FIELD] for notice in notice_ids]
daily_notices_metadata_repo.add(daily_notices_metadata)


def update_daily_notices_metadata_with_fetched_data(start_date: date = None,
end_date: date = None,
ted_api: TedAPIAdapter = None,
mongo_client: MongoClient = None,
daily_notices_metadata_repo: DailyNoticesMetadataRepository = None):
"""
Updates the daily notices metadata witch fetched data.
"""

start_date = start_date or datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT)
end_date = end_date or datetime.today() - timedelta(days=1)

if start_date > end_date:
raise Exception("Start date cannot be greater than end date")

ted_api = ted_api or TedAPIAdapter(TedRequestAPI(), config.TED_API_URL)
mongo_client = mongo_client or MongoClient(config.MONGO_DB_AUTH_URL)
daily_notices_metadata_repo = daily_notices_metadata_repo or DailyNoticesMetadataRepository(mongo_client)
notice_repo = NoticeRepository(mongo_client)

# Generate list of dates from date range
date_range = generate_list_of_dates_from_date_range(start_date, end_date)

for day in date_range:
daily_notices_metadata = daily_notices_metadata_repo.get(day)
for notice_id in daily_notices_metadata.ted_api_notice_ids:
notice: Notice = notice_repo.get(notice_id)

notice_statuses = {notice_status: 0 for notice_status in daily_notices_metadata.notice_statuses}
mapping_suite_packages = []
fetched_notice_ids = []

if notice:
fetched_notice_ids.append(notice_id)
notice_status = notice.status
notice_statuses[notice_status] += 1
if notice_status >= NoticeStatus.TRANSFORMED: # Having distilled_rdf_manifestation
mapping_suite_id = notice.rdf_manifestation.mapping_suite_id
mapping_suite_packages.append(mapping_suite_id)

daily_notices_metadata.notice_statuses= notice_statuses
daily_notices_metadata.mapping_suite_packages = mapping_suite_packages
daily_notices_metadata.fetched_notice_ids = fetched_notice_ids


daily_notices_metadata_repo.update(daily_notices_metadata)

0 comments on commit b7d4389

Please sign in to comment.