Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted4 22 #509

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dags/pipelines/notice_selectors_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
FORM_NUMBER = "normalised_metadata.form_number"
XSD_VERSION = "normalised_metadata.xsd_version"
PUBLICATION_DATE = "normalised_metadata.publication_date"
MAPPING_SUITE_ID = "mapping_suite_id"


def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None,
Expand Down Expand Up @@ -58,3 +59,22 @@ def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_numb
xsd_version=xsd_version)
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator]


def notice_ids_selector_by_mapping_suite_id(mapping_suite_id: str) -> List[str]:
"""
Return notice_ids for notices what are transformed with specific mapping_suite_id.
:param mapping_suite_id:
"""
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.manifestation_repository import RDFManifestationRepository, \
AGGREGATE_REFERENCE_ID, MANIFESTATION_TYPE_ID

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
rdf_manifestation_repository = RDFManifestationRepository(mongodb_client=mongodb_client)
mongodb_filter = {MANIFESTATION_TYPE_ID: {"$eq": "rdf"},
MAPPING_SUITE_ID: {"$eq": mapping_suite_id}
}
mongodb_result_iterator = rdf_manifestation_repository.collection.find(mongodb_filter, {AGGREGATE_REFERENCE_ID: 1})
return [result_dict[AGGREGATE_REFERENCE_ID] for result_dict in mongodb_result_iterator]
20 changes: 19 additions & 1 deletion dags/reprocess_untransformed_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""

from airflow.decorators import dag, task
from airflow.models import Param

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status, \
notice_ids_selector_by_mapping_suite_id
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
Expand All @@ -18,13 +20,25 @@

DAG_NAME = "reprocess_untransformed_notices_from_backlog"

MAPPING_SUITE_ID_DAG_PARAM = "mapping_suite_id"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

RE_TRANSFORM_DAG_PARAMS = {**REPROCESS_DAG_PARAMS,
MAPPING_SUITE_ID_DAG_PARAM: Param(
default=None,
type=["null", "string"],
format="string",
title="Mapping Suite ID",
description="This field is optional. If you want to filter transformed notices by Mapping Suite ID, please insert a Mapping Suite ID."
)
}


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
Expand All @@ -46,9 +60,13 @@ def select_notices_for_re_transform():
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
mapping_suite_id = get_dag_param(key=MAPPING_SUITE_ID_DAG_PARAM)
notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES,
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
if mapping_suite_id:
filtered_notice_ids_by_mapping_suite_id = notice_ids_selector_by_mapping_suite_id(mapping_suite_id=mapping_suite_id)
notice_ids = list(set(notice_ids).intersection(set(filtered_notice_ids_by_mapping_suite_id)))
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
Expand Down
Loading