Skip to content

Commit

Permalink
add filter by mapping_suite_id for re-transform DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
CaptainOfHacks committed Oct 11, 2023
1 parent 231af55 commit 14b0319
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
20 changes: 20 additions & 0 deletions dags/pipelines/notice_selectors_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
FORM_NUMBER = "normalised_metadata.form_number"
XSD_VERSION = "normalised_metadata.xsd_version"
PUBLICATION_DATE = "normalised_metadata.publication_date"
MAPPING_SUITE_ID = "mapping_suite_id"


def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None,
Expand Down Expand Up @@ -58,3 +59,22 @@ def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_numb
xsd_version=xsd_version)
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator]


def notice_ids_selector_by_mapping_suite_id(mapping_suite_id: str) -> List[str]:
"""
Return notice_ids for notices what are transformed with specific mapping_suite_id.
:param mapping_suite_id:
"""
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.manifestation_repository import RDFManifestationRepository, \
AGGREGATE_REFERENCE_ID, MANIFESTATION_TYPE_ID

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
rdf_manifestation_repository = RDFManifestationRepository(mongodb_client=mongodb_client)
mongodb_filter = {MANIFESTATION_TYPE_ID: {"$eq": "rdf"},
MAPPING_SUITE_ID: {"$eq": mapping_suite_id}
}
mongodb_result_iterator = rdf_manifestation_repository.collection.find(mongodb_filter, {AGGREGATE_REFERENCE_ID: 1})
return [result_dict[AGGREGATE_REFERENCE_ID] for result_dict in mongodb_result_iterator]
20 changes: 19 additions & 1 deletion dags/reprocess_untransformed_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""

from airflow.decorators import dag, task
from airflow.models import Param

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status, \
notice_ids_selector_by_mapping_suite_id
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
Expand All @@ -18,13 +20,25 @@

DAG_NAME = "reprocess_untransformed_notices_from_backlog"

MAPPING_SUITE_ID_DAG_PARAM = "mapping_suite_id"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

RE_TRANSFORM_DAG_PARAMS = {**REPROCESS_DAG_PARAMS,
MAPPING_SUITE_ID_DAG_PARAM: Param(
default=None,
type=["null", "string"],
format="string",
title="Mapping Suite ID",
description="This field is optional. If you want to filter transformed notices by Mapping Suite ID, please insert a Mapping Suite ID."
)
}


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
Expand All @@ -46,9 +60,13 @@ def select_notices_for_re_transform():
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
mapping_suite_id = get_dag_param(key=MAPPING_SUITE_ID_DAG_PARAM)
notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES,
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
if mapping_suite_id:
filtered_notice_ids_by_mapping_suite_id = notice_ids_selector_by_mapping_suite_id(mapping_suite_id=mapping_suite_id)
notice_ids = list(set(notice_ids).intersection(set(filtered_notice_ids_by_mapping_suite_id)))
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
Expand Down

0 comments on commit 14b0319

Please sign in to comment.