From b7d4389c17bef4505f2dd5824cb941294edc4fd3 Mon Sep 17 00:00:00 2001 From: Dumitru Date: Tue, 3 Oct 2023 15:03:37 +0300 Subject: [PATCH] WIP --- dags/daily_notices_metadata_update.py | 23 ++++++-- .../daily_notices_metadata_services.py | 52 ++++++++++++++++++- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/dags/daily_notices_metadata_update.py b/dags/daily_notices_metadata_update.py index eefc94cf..9acc1bd7 100644 --- a/dags/daily_notices_metadata_update.py +++ b/dags/daily_notices_metadata_update.py @@ -2,17 +2,20 @@ DAG to update daily notices metadata from TED. """ -from datetime import date +from datetime import date, datetime from airflow.models import Param from airflow.decorators import dag, task from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param +from ted_sws.data_manager.services.daily_notices_metadata_services import update_daily_notices_metadata_from_ted, \ + update_daily_notices_metadata_with_fetched_data START_DATE_PARAM_KEY = "start_date" END_DATE_PARAM_KEY = "end_date" + @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['daily', "dashboards", "metadata", "ted", "notices"], @@ -39,10 +42,22 @@ ) def daily_notices_metadata_update(): @task - def update_daily_notices_metadata_from_ted(): + def update_daily_notices_metadata_from_ted_api(): + start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=True) + end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=True) + + update_daily_notices_metadata_from_ted(start_date=datetime.strptime(start_date, "%Y-%m-%d"), + end_date=datetime.strptime(end_date, "%Y-%m-%d")) + + @task + def update_daily_notices_metadata_with_fetched_data_from_repo(): start_date = get_dag_param(key=START_DATE_PARAM_KEY, raise_error=True) end_date = get_dag_param(key=END_DATE_PARAM_KEY, raise_error=True) - update_daily_notices_metadata_from_ted(start_date=start_date, end_date=end_date) + update_daily_notices_metadata_with_fetched_data(start_date=datetime.strptime(start_date, "%Y-%m-%d"), + end_date=datetime.strptime(end_date, "%Y-%m-%d")) + + update_daily_notices_metadata_from_ted_api() >> update_daily_notices_metadata_with_fetched_data_from_repo() + - update_daily_notices_metadata_from_ted() \ No newline at end of file +dag = daily_notices_metadata_update() diff --git a/ted_sws/data_manager/services/daily_notices_metadata_services.py b/ted_sws/data_manager/services/daily_notices_metadata_services.py index 426e57d0..3c00feb7 100644 --- a/ted_sws/data_manager/services/daily_notices_metadata_services.py +++ b/ted_sws/data_manager/services/daily_notices_metadata_services.py @@ -5,8 +5,10 @@ from pymongo import MongoClient from ted_sws import config +from ted_sws.core.model.notice import Notice, NoticeStatus from ted_sws.core.model.supra_notice import DailyNoticesMetadata from ted_sws.data_manager.adapters.daily_notices_metadata_repository import DailyNoticesMetadataRepository +from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI DEFAULT_TED_API_START_DATE = "2023-09-29" # TODO: Change to 2014-01-01 @@ -57,7 +59,7 @@ def update_daily_notices_metadata_from_ted(start_date: date = None, # Getting from metadata repository dates that are not in the repository from date range dates_not_in_repository = [day for day in date_range if - not daily_notices_metadata_repo.list_daily_notices_metadata_aggregation_date()] + day not in daily_notices_metadata_repo.list_daily_notices_metadata_aggregation_date()] # Getting from TED API dates that are not in the repository from date range # TODO: If in ted are 0 notices, coverage is 1 to all @@ -70,3 +72,51 @@ def update_daily_notices_metadata_from_ted(start_date: date = None, daily_notices_metadata = DailyNoticesMetadata(aggregation_date=day) daily_notices_metadata.ted_api_notice_ids = [notice[TED_API_NOTICE_ID_FIELD] for notice in notice_ids] daily_notices_metadata_repo.add(daily_notices_metadata) + + +def update_daily_notices_metadata_with_fetched_data(start_date: date = None, + end_date: date = None, + ted_api: TedAPIAdapter = None, + mongo_client: MongoClient = None, + daily_notices_metadata_repo: DailyNoticesMetadataRepository = None): + """ + Updates the daily notices metadata witch fetched data. + """ + + start_date = start_date or datetime.strptime(DEFAULT_TED_API_START_DATE, DEFAULT_TED_API_START_DATE_FORMAT) + end_date = end_date or datetime.today() - timedelta(days=1) + + if start_date > end_date: + raise Exception("Start date cannot be greater than end date") + + ted_api = ted_api or TedAPIAdapter(TedRequestAPI(), config.TED_API_URL) + mongo_client = mongo_client or MongoClient(config.MONGO_DB_AUTH_URL) + daily_notices_metadata_repo = daily_notices_metadata_repo or DailyNoticesMetadataRepository(mongo_client) + notice_repo = NoticeRepository(mongo_client) + + # Generate list of dates from date range + date_range = generate_list_of_dates_from_date_range(start_date, end_date) + + for day in date_range: + daily_notices_metadata = daily_notices_metadata_repo.get(day) + for notice_id in daily_notices_metadata.ted_api_notice_ids: + notice: Notice = notice_repo.get(notice_id) + + notice_statuses = {notice_status: 0 for notice_status in daily_notices_metadata.notice_statuses} + mapping_suite_packages = [] + fetched_notice_ids = [] + + if notice: + fetched_notice_ids.append(notice_id) + notice_status = notice.status + notice_statuses[notice_status] += 1 + if notice_status >= NoticeStatus.TRANSFORMED: # Having distilled_rdf_manifestation + mapping_suite_id = notice.rdf_manifestation.mapping_suite_id + mapping_suite_packages.append(mapping_suite_id) + + daily_notices_metadata.notice_statuses= notice_statuses + daily_notices_metadata.mapping_suite_packages = mapping_suite_packages + daily_notices_metadata.fetched_notice_ids = fetched_notice_ids + + + daily_notices_metadata_repo.update(daily_notices_metadata)