Skip to content

Commit

Permalink
implement DAGs trigger UI
Browse files Browse the repository at this point in the history
  • Loading branch information
CaptainOfHacks committed Sep 18, 2023
1 parent 8540649 commit 61120b5
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 50 deletions.
39 changes: 34 additions & 5 deletions dags/fetch_notices_by_date_range.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from datetime import datetime
from datetime import datetime, date
from typing import Any

from airflow.models import Param
from dateutil import rrule

from airflow.decorators import dag, task
Expand Down Expand Up @@ -29,11 +31,37 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l
"""
return [dt.strftime('%Y%m%d*')
for dt in rrule.rrule(rrule.DAILY,
dtstart=datetime.strptime(start_date, '%Y%m%d'),
until=datetime.strptime(end_date, '%Y%m%d'))]
dtstart=datetime.strptime(start_date, '%Y-%m-%d'),
until=datetime.strptime(end_date, '%Y-%m-%d'))]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'])
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
params={
START_DATE_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="Start Date",
description="""This field is required.
Start date of the date range to fetch notices from TED."""
),
END_DATE_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="End Date",
description="""This field is required.
End date of the date range to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=False,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_date_range():
@task
@event_log(TechnicalEventMessage(
Expand All @@ -46,7 +74,8 @@ def trigger_notice_by_date_for_each_date_in_range():
context: Any = get_current_context()
start_date = get_dag_param(key=START_DATE_KEY, raise_error=True)
end_date = get_dag_param(key=END_DATE_KEY, raise_error=True)
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False)
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
default_value=False)
date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date)
for date_wildcard in date_wildcards:
TriggerDagRunOperator(
Expand Down
20 changes: 19 additions & 1 deletion dags/fetch_notices_by_query.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow.decorators import dag, task
from airflow.models import Param
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -22,7 +23,24 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['fetch'])
tags=['fetch'],
params={
QUERY_DAG_KEY: Param(
default=None,
type="string",
title="Query",
description="""This field is required.
Query to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=True,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_query():
@task
@event_log(TechnicalEventMessage(
Expand Down
37 changes: 34 additions & 3 deletions dags/load_mapping_suite_in_database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.models import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from pymongo import MongoClient
Expand Down Expand Up @@ -30,7 +31,37 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['fetch', 'mapping-suite', 'github'])
tags=['fetch', 'mapping-suite', 'github'],
params={
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Github repository url",
description="""This is optional field.
Github repository url to fetch mapping suite package from."""
),
BRANCH_OR_TAG_NAME_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Branch or tag name",
description="""This is optional field.
Branch or tag name to fetch mapping suite package from."""
),
MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Mapping suite package name",
description="""This is optional field.
Mapping suite package name to fetch from github repository."""
),
LOAD_TEST_DATA_DAG_PARAM_KEY: Param(
default=False,
type="boolean",
title="Load test data",
description="""This field is used to load test data."""
)
}
)
def load_mapping_suite_in_database():
@task
@event_log(is_loggable=False)
Expand Down Expand Up @@ -77,7 +108,7 @@ def _branch_selector():
task_id=CHECK_IF_LOAD_TEST_DATA_TASK_ID,
python_callable=_branch_selector,
)
finish_step = DummyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)
Expand Down
18 changes: 17 additions & 1 deletion dags/load_notices_in_fuseki.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow.decorators import dag, task
from airflow.models import Param
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
Expand All @@ -17,7 +18,22 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['load', 'notices', 'fuseki'])
tags=['load', 'notices', 'fuseki'],
params={
FUSEKI_DATASET_NAME_DAG_PARAM_KEY: Param(
default=DEFAULT_FUSEKI_DATASET_NAME,
type="string",
title="Fuseki dataset name",
description="This field is used to specify the name of the dataset in Fuseki."
),
NOTICE_STATUS_DAG_PARAM_KEY: Param(
default=str(NoticeStatus.PUBLISHED),
type="string",
title="Notice status",
description="This field is used to filter notices by status."
)
}
)
def load_notices_in_fuseki():
@task
def load_distilled_rdf_manifestations_in_fuseki():
Expand Down
2 changes: 1 addition & 1 deletion dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _selector_branch_before_publish():
def _stop_processing():
notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY)
if not notice_ids:
raise Exception(f"No notice has been processed!")
raise Exception("No notice has been processed!")

start_processing = BranchPythonOperator(
task_id=BRANCH_SELECTOR_TASK_ID,
Expand Down
43 changes: 43 additions & 0 deletions dags/reprocess_dag_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from airflow.models import Param

FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"

REPROCESS_DATE_RANGE_DAG_PARAMS = {
START_DATE_DAG_PARAM: Param(
default=None,
type=["null", "string"],
format="date",
title="Start Date",
description="""This field is optional. If you want to filter notices by start date, please insert a date.
This field needs to be used with the end date field.
"""
),
END_DATE_DAG_PARAM: Param(
default=None,
type=["null", "string"],
format="date",
title="End Date",
description="""This field is optional. If you want to filter notices by end date, please insert a date.
This field needs to be used with the start date field.
""",
)
}

REPROCESS_DAG_PARAMS = {
FORM_NUMBER_DAG_PARAM: Param(
default=None,
type=["null", "string"],
title="Form Number",
description="This field is optional. If you want to filter notices by form number, please insert a form number."),
**REPROCESS_DATE_RANGE_DAG_PARAMS,
XSD_VERSION_DAG_PARAM: Param(
default=None,
type=["null", "string"],
format="string",
title="XSD Version",
description="This field is optional. If you want to filter notices by XSD version, please insert a XSD version."
)
}
13 changes: 6 additions & 7 deletions dags/reprocess_published_in_cellar_notices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
Expand All @@ -15,15 +16,13 @@

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-transform-publicly-available'])
tags=['selector', 're-transform-publicly-available'],
params=REPROCESS_DAG_PARAMS
)
def reprocess_published_in_cellar_notices():
@task
@event_log(TechnicalEventMessage(
Expand Down
9 changes: 4 additions & 5 deletions dags/reprocess_unnormalised_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from dags.reprocess_dag_params import START_DATE_DAG_PARAM, END_DATE_DAG_PARAM, REPROCESS_DATE_RANGE_DAG_PARAMS

DAG_NAME = "reprocess_unnormalised_notices_from_backlog"

TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 'raw-notices'])
tags=['selector', 'raw-notices'],
params=REPROCESS_DATE_RANGE_DAG_PARAMS
)
def reprocess_unnormalised_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
Expand Down
10 changes: 5 additions & 5 deletions dags/reprocess_unpackaged_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM

DAG_NAME = "reprocess_unpackaged_notices_from_backlog"

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-package'])
tags=['selector', 're-package'],
params=REPROCESS_DAG_PARAMS
)
def reprocess_unpackaged_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
Expand Down
13 changes: 6 additions & 7 deletions dags/reprocess_unpublished_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

Expand All @@ -17,15 +18,13 @@
NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-publish'])
tags=['selector', 're-publish'],
params=REPROCESS_DAG_PARAMS
)
def reprocess_unpublished_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
Expand Down
16 changes: 8 additions & 8 deletions dags/reprocess_untransformed_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
Expand All @@ -14,19 +15,18 @@
DAG_NAME = "reprocess_untransformed_notices_from_backlog"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
XSD_VERSION_DAG_PARAM = "xsd_version"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-transform'])
tags=['selector', 're-transform'],
params=REPROCESS_DAG_PARAMS
)
def reprocess_untransformed_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
Expand Down
Loading

0 comments on commit 61120b5

Please sign in to comment.