Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new normalisation #510

Closed
wants to merge 134 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
134 commits
Select commit Hold shift + click to select a range
7b109a6
implement github download with credentials
CaptainOfHacks Jul 4, 2023
5461333
Fixed tests for notice_publisher feature
kaleanych Jul 4, 2023
27b4228
Merge pull request #485 from OP-TED/feature/TED-1401
CaptainOfHacks Jul 4, 2023
aef716e
Update conceptual_mapping_processor.py
CaptainOfHacks Jul 4, 2023
c3a866a
Merge pull request #486 from OP-TED/feature/TED-1401
CaptainOfHacks Jul 4, 2023
0ff927d
wip
CaptainOfHacks Jul 4, 2023
61f8d9c
Merge pull request #487 from OP-TED/feature/TED-1401
CaptainOfHacks Jul 4, 2023
49ac175
Update notice_processing_pipeline.py
CaptainOfHacks Sep 8, 2023
2503685
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 8, 2023
9410465
Update notice_batch_processor_pipelines.py
CaptainOfHacks Sep 8, 2023
619cc4b
Update notice_processor_pipelines.py
CaptainOfHacks Sep 8, 2023
7cb91d9
Update pipeline_protocols.py
CaptainOfHacks Sep 8, 2023
52bc6e7
Create MappingSuiteTransformationPool
CaptainOfHacks Sep 8, 2023
188ef86
Update documentation
CaptainOfHacks Sep 8, 2023
7f6d107
Extend notice eligibility services
CaptainOfHacks Sep 8, 2023
6c68d78
Create notice_batch_transformer.py
CaptainOfHacks Sep 8, 2023
56a8152
Update docker-compose-worker.yaml
CaptainOfHacks Sep 8, 2023
9cc2aab
Add AIRFLOW_NUMBER_OF_WORKERS env config
CaptainOfHacks Sep 8, 2023
bcb162c
Update docker-compose.yaml
CaptainOfHacks Sep 12, 2023
c60ddf3
Update docker-compose.yaml
CaptainOfHacks Sep 12, 2023
e41e97c
Update Airflow version to 2.7.1
CaptainOfHacks Sep 12, 2023
9c3922a
Update rmlmapper to 6.2.1
CaptainOfHacks Sep 12, 2023
f0d8ebd
Update project requirements
CaptainOfHacks Sep 12, 2023
905c68b
update sample application with new queries
duprijil Sep 12, 2023
05e7895
Create sample_application.xlsx
duprijil Sep 12, 2023
4ac8b8b
Adjust tests and model logic to suport pydantic 2 version
CaptainOfHacks Sep 12, 2023
899d41f
Merge pull request #489 from OP-TED/feature/TDA-272
CaptainOfHacks Sep 12, 2023
bacff55
Update conftest.py
CaptainOfHacks Sep 12, 2023
c3ff970
Update conftest.py
CaptainOfHacks Sep 12, 2023
0c44bfa
Update entity_deduplication.py
CaptainOfHacks Sep 12, 2023
850d48a
Merge branch 'feature/TED6-37' into feature/TED6-23
CaptainOfHacks Sep 12, 2023
6adc67a
Merge pull request #488 from OP-TED/feature/TED6-37
CaptainOfHacks Sep 12, 2023
c968cc7
Merge pull request #490 from OP-TED/feature/TED6-23
CaptainOfHacks Sep 12, 2023
8a756b1
fix AirflowNumber of workers config
CaptainOfHacks Sep 12, 2023
6f81d83
add ignore warning deployment exposure
CaptainOfHacks Sep 12, 2023
3274fd0
Merge pull request #491 from OP-TED/feature/TED6-23
CaptainOfHacks Sep 12, 2023
2e84ddb
Sample application page moved to another repo
duprijil Sep 13, 2023
616d67b
delete unnecessary images
duprijil Sep 13, 2023
f762e74
Create test_new_trigger_ui.py
CaptainOfHacks Sep 15, 2023
d51dbb3
delete unnecessary files
duprijil Sep 15, 2023
47c8a01
Merge pull request #492 from OP-TED/feature/TED-1416
costezki Sep 17, 2023
5962693
Create test_dag_a.py
CaptainOfHacks Sep 18, 2023
6a288cd
Update test_dag_a.py
CaptainOfHacks Sep 18, 2023
aaec47c
Update test_dag_a.py
CaptainOfHacks Sep 18, 2023
78f82f0
Update test_dag_a.py
CaptainOfHacks Sep 18, 2023
e6e035e
Update test_dag_a.py
CaptainOfHacks Sep 18, 2023
8540649
Create test_dag_b_trigger.py
CaptainOfHacks Sep 18, 2023
61120b5
implement DAGs trigger UI
CaptainOfHacks Sep 18, 2023
6d0b7b7
Update fetch_notices_by_date.py
CaptainOfHacks Sep 18, 2023
782fa58
Update fetch_notices_by_query.py
CaptainOfHacks Sep 18, 2023
f01dd75
Update fetch_notices_by_date.py
CaptainOfHacks Sep 18, 2023
4fc420d
Update fetch_notices_by_query.py
CaptainOfHacks Sep 18, 2023
5da8d5e
remove test dags
CaptainOfHacks Sep 18, 2023
f187842
Revert "remove test dags"
CaptainOfHacks Sep 18, 2023
7f48819
add DAGs documentation feature
CaptainOfHacks Sep 18, 2023
0b8c19a
Merge pull request #493 from OP-TED/feature/TED6-33
CaptainOfHacks Sep 19, 2023
9840689
minimize project requirements
CaptainOfHacks Sep 19, 2023
2a19e37
Update requirements.txt
CaptainOfHacks Sep 19, 2023
6eeb9b3
Merge pull request #494 from OP-TED/feature/TED6-40
CaptainOfHacks Sep 20, 2023
8534c81
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 20, 2023
acf52b4
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 20, 2023
0764261
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 20, 2023
646b688
Merge pull request #495 from OP-TED/feature/TED6-25
CaptainOfHacks Sep 20, 2023
6a62161
Update fetch_notices_by_date.py
CaptainOfHacks Sep 23, 2023
c55262d
Update fetch_notices_by_date_range.py
CaptainOfHacks Sep 23, 2023
5a5807b
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 23, 2023
0937151
fix processed notice ids
CaptainOfHacks Sep 23, 2023
946a9d2
Merge pull request #496 from OP-TED/feature/TED6-50
CaptainOfHacks Sep 23, 2023
9c4bdda
Update event_message.py
CaptainOfHacks Sep 25, 2023
2c26e0d
Update event_message.py
CaptainOfHacks Sep 25, 2023
e0f85c7
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 25, 2023
b6b10b6
wip
CaptainOfHacks Sep 25, 2023
e7a3c64
wip
CaptainOfHacks Sep 25, 2023
996306a
WIP
CaptainOfHacks Sep 25, 2023
fd505a4
Update notice_processor_pipelines.py
CaptainOfHacks Sep 25, 2023
f4f8487
Update DagBatchPipelineOperator.py
CaptainOfHacks Sep 25, 2023
4905038
Merge pull request #497 from OP-TED/feature/TED4-55
CaptainOfHacks Sep 25, 2023
5416dd7
Fix when fetch already fetched notices don't remove old lazy_fields
CaptainOfHacks Sep 27, 2023
e491e5d
Merge pull request #498 from OP-TED/feature/TED4-57
CaptainOfHacks Sep 27, 2023
8449c26
implement multiple publish channels
duprijil Sep 29, 2023
ef84a41
change ENABLED config var with BUCKET and HOST
duprijil Sep 29, 2023
d39aff1
Merge pull request #499 from OP-TED/feature/TED6-32
duprijil Sep 29, 2023
54084cc
metabase version update
Dragos0000 Sep 29, 2023
e9b5804
make cellar url configurable from env and airflow
duprijil Sep 29, 2023
b59a77a
fix incorrect type for string values
CaptainOfHacks Sep 29, 2023
7b5d632
rename to CELLAR_WEBAPI_SPARQL_URL
duprijil Sep 29, 2023
88b6af3
Merge pull request #502 from OP-TED/feature/TED4-62
CaptainOfHacks Sep 29, 2023
4870847
Merge pull request #501 from OP-TED/feature/TED4-21
duprijil Sep 29, 2023
4296f0e
create DailyNoticesMetadata model, repository + tests
duprijil Oct 2, 2023
f5fba8e
WIP
duprijil Oct 2, 2023
56cbb7f
WIP
duprijil Oct 3, 2023
b7d4389
WIP
duprijil Oct 3, 2023
6c4f614
add cumulative frequency
CaptainOfHacks Oct 3, 2023
262d3fb
WIP
duprijil Oct 3, 2023
272d0a8
WIP
duprijil Oct 3, 2023
78440f7
WIP
duprijil Oct 3, 2023
102ebf1
prepare tests
duprijil Oct 4, 2023
e61ab85
Update conftest.py
duprijil Oct 4, 2023
107eb9c
Update yarrrml2rml_converter.py
duprijil Oct 4, 2023
a1eb832
update depricated pydantic functions
duprijil Oct 4, 2023
9033665
Merge pull request #504 from OP-TED/feature/TED4-65
duprijil Oct 4, 2023
92044d4
Merge branch 'main' into feature/TED4-10
duprijil Oct 4, 2023
7298fff
Merge branch 'main' into feature/TED4-49
duprijil Oct 4, 2023
a45c5d3
make changes according review
duprijil Oct 4, 2023
6c12c06
Merge pull request #505 from OP-TED/feature/TED4-49
duprijil Oct 4, 2023
97b6258
Merge branch 'main' into feature/TED4-10
duprijil Oct 4, 2023
4f63665
update conftests
duprijil Oct 4, 2023
7440de7
Update test_notices_metadata_services.py
duprijil Oct 4, 2023
6eb5687
Merge pull request #500 from OP-TED/feature/TED4-29
Dragos0000 Oct 4, 2023
095801d
Merge pull request #503 from OP-TED/feature/TED4-10
duprijil Oct 4, 2023
70be603
resolve bug
duprijil Oct 6, 2023
6ee10a5
Update daily_notices_metadata_services.py
duprijil Oct 6, 2023
ba3cf16
Update test_notices_metadata_services.py
duprijil Oct 6, 2023
3e3fcbf
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
dd053ed
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
da1977c
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
b67146b
make start picking at dag level
duprijil Oct 6, 2023
2733359
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
b1c5a1c
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
a31ae28
Update daily_notices_metadata_update.py
duprijil Oct 6, 2023
e26fe8b
Merge pull request #506 from OP-TED/feature/TED4-66
duprijil Oct 6, 2023
774c953
code cleaning and update to pydantic v2
CaptainOfHacks Oct 10, 2023
39e3df4
Merge pull request #507 from OP-TED/feature/TED4-64
CaptainOfHacks Oct 11, 2023
318697e
optimise e2e tests
CaptainOfHacks Oct 11, 2023
231af55
clean tests
CaptainOfHacks Oct 11, 2023
8c86cd0
Merge pull request #508 from OP-TED/feature/TED4-67
CaptainOfHacks Oct 11, 2023
14b0319
add filter by mapping_suite_id for re-transform DAG
CaptainOfHacks Oct 11, 2023
0703cb1
Merge branch 'main' into feature/TED4-22
CaptainOfHacks Oct 11, 2023
5aa4c19
Merge pull request #509 from OP-TED/feature/TED4-22
CaptainOfHacks Oct 11, 2023
4bc2a87
extreactor adapters
Dragos0000 Oct 13, 2023
c4c23b3
define new notice extractor and normaliser architecture
CaptainOfHacks Oct 13, 2023
2005615
first draft WIP
Dragos0000 Oct 20, 2023
6e5260f
Update test_notice_packager.py
CaptainOfHacks Oct 20, 2023
80b60a7
test passing
Dragos0000 Oct 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ CAROOT = $(shell pwd)/infra/traefik/certs
install:
@ echo -e "$(BUILD_PRINT)Installing the requirements$(END_BUILD_PRINT)"
@ pip install --upgrade pip
@ pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
@ pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"

install-dev:
@ echo -e "$(BUILD_PRINT)Installing the dev requirements$(END_BUILD_PRINT)"
@ pip install --upgrade pip
@ pip install --no-cache-dir -r requirements.dev.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
@ pip install --no-cache-dir -r requirements.dev.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"

test: test-unit

Expand Down Expand Up @@ -213,7 +213,7 @@ stop-metabase:
init-rml-mapper:
@ echo -e "RMLMapper folder initialisation!"
@ mkdir -p ./.rmlmapper
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.1.3/rmlmapper-6.1.3-r367-all.jar -O ./.rmlmapper/rmlmapper.jar
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.1/rmlmapper-6.2.1-r368-all.jar -O ./.rmlmapper/rmlmapper.jar

init-limes:
@ echo -e "Limes folder initialisation!"
Expand Down
7 changes: 7 additions & 0 deletions dags/daily_check_notices_availability_in_cellar.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
This DAG is responsible for checking the availability of notices in the cellar.
"""


from airflow.decorators import dag, task
from pymongo import MongoClient

Expand All @@ -11,6 +16,8 @@


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
catchup=False,
schedule_interval="0 0 * * *",
tags=['daily', 'validation'])
Expand Down
6 changes: 6 additions & 0 deletions dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
This DAG is responsible for updating the materialized views of the notice and batch collections.
"""

from airflow.decorators import dag, task
from pymongo import MongoClient

Expand All @@ -13,6 +17,8 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
schedule_interval="0 6 * * *",
tags=['mongodb', 'daily-views-update'])
def daily_materialized_views_update():
Expand Down
73 changes: 73 additions & 0 deletions dags/daily_notices_metadata_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
DAG to update daily notices metadata from TED.
"""

from datetime import date, datetime, timedelta

from airflow.decorators import dag, task
from airflow.models import Param
from airflow.timetables.trigger import CronTriggerTimetable

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from ted_sws.supra_notice_manager.services.daily_notices_metadata_services import \
update_daily_notices_metadata_from_ted, \
update_daily_notices_metadata_with_fetched_data

START_DATE_PARAM_KEY = "start_date"
END_DATE_PARAM_KEY = "end_date"
DEFAULT_TED_API_START_DATE = "2014-01-01"
DEFAULT_TED_API_START_DATE_FORMAT = "%Y-%m-%d"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['daily', "dashboards", "metadata", "ted", "notices"],
catchup=False,
timetable=CronTriggerTimetable('0 19 * * *', timezone='UTC'),
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
params={
START_DATE_PARAM_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="Start Date",
description="""This field is required.
Start date of the date range to fetch notices from TED."""
),
END_DATE_PARAM_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="End Date",
description="""This field is required.
End date of the date range to fetch notices from TED."""
)
}
)
def daily_notices_metadata_update():
@task
def update_daily_notices_metadata_from_ted_api():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, default_value=DEFAULT_TED_API_START_DATE)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, default_value=(datetime.today() - timedelta(days=1)).strftime(
DEFAULT_TED_API_START_DATE_FORMAT))

update_daily_notices_metadata_from_ted(
start_date=datetime.strptime(start_date, DEFAULT_TED_API_START_DATE_FORMAT),
end_date=datetime.strptime(end_date, DEFAULT_TED_API_START_DATE_FORMAT))

@task
def update_daily_notices_metadata_with_fetched_data_from_repo():
start_date = get_dag_param(key=START_DATE_PARAM_KEY, default_value=DEFAULT_TED_API_START_DATE)
end_date = get_dag_param(key=END_DATE_PARAM_KEY, default_value=(datetime.today() - timedelta(days=1)).strftime(
DEFAULT_TED_API_START_DATE_FORMAT))

update_daily_notices_metadata_with_fetched_data(
start_date=datetime.strptime(start_date, DEFAULT_TED_API_START_DATE_FORMAT),
end_date=datetime.strptime(end_date, DEFAULT_TED_API_START_DATE_FORMAT))

update_daily_notices_metadata_from_ted_api() >> update_daily_notices_metadata_with_fetched_data_from_repo()


dag = daily_notices_metadata_update()
49 changes: 39 additions & 10 deletions dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from datetime import timedelta
"""
This DAG is used to fetch notices from TED by date.
"""

from datetime import date, datetime, timedelta

from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.models import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.timetables.trigger import CronTriggerTimetable
Expand All @@ -17,7 +22,7 @@

DAG_NAME = "fetch_notices_by_date"
BATCH_SIZE = 2000
WILD_CARD_DAG_KEY = "wild_card"
FETCH_DATE_DAG_KEY = "fetch_date"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow"
TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow"
Expand All @@ -28,8 +33,28 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
tags=['selector', 'daily-fetch'])
tags=['selector', 'daily-fetch'],
params={
FETCH_DATE_DAG_KEY: Param(
default=f"{date.today() - timedelta(days=1)}",
type="string",
format="date",
title="Date",
description="""This field is required.
Date to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=True,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_date():
@task
@event_log(TechnicalEventMessage(
Expand All @@ -39,7 +64,11 @@ def fetch_notices_by_date():
))
)
def fetch_by_date_notice_from_ted():
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY))
default_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
selected_date = datetime.strptime(get_dag_param(key=FETCH_DATE_DAG_KEY,
default_value=default_date), "%Y-%m-%d")
date_wild_card = datetime.strftime(selected_date, "%Y%m%d*")
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=date_wild_card)
if not notice_ids:
log_error("No notices has been fetched!")
else:
Expand All @@ -61,10 +90,10 @@ def validate_fetched_notices():
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice
from datetime import datetime
from pymongo import MongoClient

publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY,
default_value=(datetime.now() - timedelta(days=1)).strftime(
"%Y%m%d*")), "%Y%m%d*")
default_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
publication_date = datetime.strptime(get_dag_param(key=FETCH_DATE_DAG_KEY,
default_value=default_date
), "%Y-%m-%d")
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client)

Expand All @@ -91,7 +120,7 @@ def _branch_selector():
python_callable=validate_fetched_notices
)

finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

fetch_by_date_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,
Expand Down
59 changes: 47 additions & 12 deletions dags/fetch_notices_by_date_range.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from datetime import datetime
"""
This DAG is responsible for fetching notices from TED by date range.
"""

from datetime import datetime, date
from typing import Any

from airflow.models import Param
from dateutil import rrule

from airflow.decorators import dag, task
Expand All @@ -8,7 +14,7 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
from dags.fetch_notices_by_date import FETCH_DATE_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
Expand All @@ -20,20 +26,48 @@
END_DATE_KEY = "end_date"


def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> list:
def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> list:
"""
Given a date range returns all daily dates in that range
:param start_date:
:param end_date:
:return:
"""
return [dt.strftime('%Y%m%d*')
return [dt
for dt in rrule.rrule(rrule.DAILY,
dtstart=datetime.strptime(start_date, '%Y%m%d'),
until=datetime.strptime(end_date, '%Y%m%d'))]
dtstart=datetime.strptime(start_date, '%Y-%m-%d'),
until=datetime.strptime(end_date, '%Y-%m-%d'))]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'])
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
params={
START_DATE_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="Start Date",
description="""This field is required.
Start date of the date range to fetch notices from TED."""
),
END_DATE_KEY: Param(
default=f"{date.today()}",
type="string",
format="date",
title="End Date",
description="""This field is required.
End date of the date range to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=False,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_date_range():
@task
@event_log(TechnicalEventMessage(
Expand All @@ -46,13 +80,14 @@ def trigger_notice_by_date_for_each_date_in_range():
context: Any = get_current_context()
start_date = get_dag_param(key=START_DATE_KEY, raise_error=True)
end_date = get_dag_param(key=END_DATE_KEY, raise_error=True)
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False)
date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date)
for date_wildcard in date_wildcards:
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
default_value=False)
fetch_dates = generate_list_of_dates_from_date_range(start_date, end_date)
for fetch_date in fetch_dates:
TriggerDagRunOperator(
task_id=f'trigger_notice_fetch_by_date_workflow_dag_{date_wildcard[:-1]}',
task_id=f"trigger_notice_fetch_by_date_workflow_dag_{fetch_date.strftime('%Y_%m_%d')}",
trigger_dag_id=FETCH_NOTICES_BY_DATE_DAG_NAME,
conf={WILD_CARD_DAG_KEY: date_wildcard,
conf={FETCH_DATE_DAG_KEY: fetch_date.strftime('%Y-%m-%d'),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: trigger_complete_workflow,
}
).execute(context=context)
Expand Down
32 changes: 29 additions & 3 deletions dags/fetch_notices_by_query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""
This DAG is responsible for fetching notices from TED by query.
"""

from datetime import date

from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.models import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from dags import DEFAULT_DAG_ARGUMENTS
Expand All @@ -22,7 +29,26 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['fetch'])
tags=['fetch'],
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
params={
QUERY_DAG_KEY: Param(
default=f"PD=[>={date.today().strftime('%Y%m%d')} AND <={date.today().strftime('%Y%m%d')}]",
type="string",
title="Query",
description="""This field is required.
Query to fetch notices from TED."""
),
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
default=True,
type="boolean",
title="Trigger Complete Workflow",
description="""This field is required.
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
)
}
)
def fetch_notices_by_query():
@task
@event_log(TechnicalEventMessage(
Expand Down Expand Up @@ -58,7 +84,7 @@ def _branch_selector():
python_callable=_branch_selector,
)

finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,
Expand Down
Loading
Loading