diff --git a/dags/pipelines/notice_selectors_pipelines.py b/dags/pipelines/notice_selectors_pipelines.py index 541ae603..f24789e8 100644 --- a/dags/pipelines/notice_selectors_pipelines.py +++ b/dags/pipelines/notice_selectors_pipelines.py @@ -5,6 +5,7 @@ FORM_NUMBER = "normalised_metadata.form_number" XSD_VERSION = "normalised_metadata.xsd_version" PUBLICATION_DATE = "normalised_metadata.publication_date" +MAPPING_SUITE_ID = "mapping_suite_id" def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None, @@ -58,3 +59,22 @@ def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_numb xsd_version=xsd_version) mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1}) return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator] + + +def notice_ids_selector_by_mapping_suite_id(mapping_suite_id: str) -> List[str]: + """ + Return notice_ids for notices what are transformed with specific mapping_suite_id. + :param mapping_suite_id: + """ + from pymongo import MongoClient + from ted_sws import config + from ted_sws.data_manager.adapters.manifestation_repository import RDFManifestationRepository, \ + AGGREGATE_REFERENCE_ID, MANIFESTATION_TYPE_ID + + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + rdf_manifestation_repository = RDFManifestationRepository(mongodb_client=mongodb_client) + mongodb_filter = {MANIFESTATION_TYPE_ID: {"$eq": "rdf"}, + MAPPING_SUITE_ID: {"$eq": mapping_suite_id} + } + mongodb_result_iterator = rdf_manifestation_repository.collection.find(mongodb_filter, {AGGREGATE_REFERENCE_ID: 1}) + return [result_dict[AGGREGATE_REFERENCE_ID] for result_dict in mongodb_result_iterator] diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py index d65503ab..ecba478d 100644 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ b/dags/reprocess_untransformed_notices_from_backlog.py @@ -3,12 +3,14 @@ """ from airflow.decorators import dag, task +from airflow.models import Param from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status, \ + notice_ids_selector_by_mapping_suite_id from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \ END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM from ted_sws.core.model.notice import NoticeStatus @@ -18,6 +20,8 @@ DAG_NAME = "reprocess_untransformed_notices_from_backlog" +MAPPING_SUITE_ID_DAG_PARAM = "mapping_suite_id" + RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, @@ -25,6 +29,16 @@ ] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" +RE_TRANSFORM_DAG_PARAMS = {**REPROCESS_DAG_PARAMS, + MAPPING_SUITE_ID_DAG_PARAM: Param( + default=None, + type=["null", "string"], + format="string", + title="Mapping Suite ID", + description="This field is optional. If you want to filter transformed notices by Mapping Suite ID, please insert a Mapping Suite ID." + ) +} + @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, @@ -46,9 +60,13 @@ def select_notices_for_re_transform(): start_date = get_dag_param(key=START_DATE_DAG_PARAM) end_date = get_dag_param(key=END_DATE_DAG_PARAM) xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) + mapping_suite_id = get_dag_param(key=MAPPING_SUITE_ID_DAG_PARAM) notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES, form_number=form_number, start_date=start_date, end_date=end_date, xsd_version=xsd_version) + if mapping_suite_id: + filtered_notice_ids_by_mapping_suite_id = notice_ids_selector_by_mapping_suite_id(mapping_suite_id=mapping_suite_id) + notice_ids = list(set(notice_ids).intersection(set(filtered_notice_ids_by_mapping_suite_id))) push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(