Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted6 23 #490

Merged
merged 21 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ CAROOT = $(shell pwd)/infra/traefik/certs
install:
@ echo -e "$(BUILD_PRINT)Installing the requirements$(END_BUILD_PRINT)"
@ pip install --upgrade pip
@ pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
@ pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"

install-dev:
@ echo -e "$(BUILD_PRINT)Installing the dev requirements$(END_BUILD_PRINT)"
@ pip install --upgrade pip
@ pip install --no-cache-dir -r requirements.dev.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
@ pip install --no-cache-dir -r requirements.dev.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"

test: test-unit

Expand Down Expand Up @@ -213,7 +213,7 @@ stop-metabase:
init-rml-mapper:
@ echo -e "RMLMapper folder initialisation!"
@ mkdir -p ./.rmlmapper
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.1.3/rmlmapper-6.1.3-r367-all.jar -O ./.rmlmapper/rmlmapper.jar
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.1/rmlmapper-6.2.1-r368-all.jar -O ./.rmlmapper/rmlmapper.jar

init-limes:
@ echo -e "Limes folder initialisation!"
Expand Down
15 changes: 8 additions & 7 deletions dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_transformation_pipeline, \
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY, NUMBER_OF_AIRFLOW_WORKERS
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline, \
notice_batch_transformer_pipeline
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_validation_pipeline, \
notice_package_pipeline, notice_publish_pipeline

NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
Expand Down Expand Up @@ -46,8 +47,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
max_active_runs=256,
max_active_tasks=256,
max_active_runs=NUMBER_OF_AIRFLOW_WORKERS,
max_active_tasks=1,
tags=['worker', 'pipeline'])
def notice_processing_pipeline():
"""
Expand Down Expand Up @@ -118,7 +119,7 @@ def _stop_processing():
task_id=NOTICE_NORMALISATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

notice_transformation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_transformation_pipeline,
notice_transformation_step = NoticeBatchPipelineOperator(batch_pipeline_callable=notice_batch_transformer_pipeline,
task_id=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

Expand Down
22 changes: 16 additions & 6 deletions dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Protocol, List
from uuid import uuid4
from airflow.models import BaseOperator
Expand All @@ -6,9 +7,9 @@

from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \
smart_xcom_push
from ted_sws.core.service.batch_processing import chunks
from dags.pipelines.pipeline_protocols import NoticePipelineCallable
from ted_sws import config
from ted_sws.core.service.batch_processing import chunks
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage
from ted_sws.event_manager.services.log import log_notice_error
Expand All @@ -17,11 +18,12 @@
NOTICE_IDS_KEY = "notice_ids"
START_WITH_STEP_NAME_KEY = "start_with_step_name"
EXECUTE_ONLY_ONE_STEP_KEY = "execute_only_one_step"
DEFAULT_NUMBER_OF_CELERY_WORKERS = 144 # TODO: revise this config
DEFAULT_NUMBER_OF_CELERY_WORKERS = 6 # TODO: revise this config
NOTICE_PROCESSING_PIPELINE_DAG_NAME = "notice_processing_pipeline"
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
MAX_BATCH_SIZE = 5000
NUMBER_OF_AIRFLOW_WORKERS = config.NUMBER_OF_AIRFLOW_WORKERS
MAX_BATCH_SIZE = 2000


class BatchPipelineCallable(Protocol):
Expand Down Expand Up @@ -79,7 +81,7 @@ def execute(self, context: Any):
processed_notice_ids.extend(
self.batch_pipeline_callable(notice_ids=notice_ids, mongodb_client=mongodb_client))
elif self.notice_pipeline_callable is not None:
for notice_id in notice_ids:
def multithread_notice_processor(notice_id: str):
notice = None
try:
notice_event = NoticeEventMessage(notice_id=notice_id, domain_action=pipeline_name)
Expand All @@ -96,13 +98,21 @@ def execute(self, context: Any):
notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype
notice_event.notice_status = str(notice.status)
logger.info(event_message=notice_event)
except Exception as e:
error_message = result_notice_pipeline.error_message
except Exception as exception_error_message:
error_message = str(exception_error_message)
if error_message:
notice_normalised_metadata = notice.normalised_metadata if notice else None
log_notice_error(message=str(e), notice_id=notice_id, domain_action=pipeline_name,
log_notice_error(message=error_message, notice_id=notice_id, domain_action=pipeline_name,
notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,
notice_status=notice.status if notice else None,
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(multithread_notice_processor, notice_id) for notice_id in
notice_ids]
for future in futures:
future.result()
batch_event_message.end_record()
logger.info(event_message=batch_event_message)
if not processed_notice_ids:
Expand Down
25 changes: 25 additions & 0 deletions dags/pipelines/notice_batch_processor_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,28 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
for notice in notices:
notice_repository.update(notice=notice)
return notice_ids


def notice_batch_transformer_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
"""

:param notice_ids:
:param mongodb_client:
:return:
"""
from ted_sws.notice_transformer.adapters.notice_batch_transformer import MappingSuiteTransformationPool
from ted_sws.notice_transformer.services.notice_batch_transformer import notice_batch_transformer
from concurrent.futures import ThreadPoolExecutor

mapping_transformation_pool = MappingSuiteTransformationPool(mongodb_client=mongodb_client,
transformation_timeout=300)
result_notice_ids = []
with ThreadPoolExecutor() as executor:
features = [executor.submit(notice_batch_transformer, notice_id, mapping_transformation_pool) for notice_id in
notice_ids]
for feature in features:
result_notice_id = feature.result()
if result_notice_id:
result_notice_ids.append(result_notice_id)
mapping_transformation_pool.close()
return result_notice_ids
9 changes: 6 additions & 3 deletions dags/pipelines/notice_processor_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient =
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
notice.update_status_to(new_status=NoticeStatus.RAW)
indexed_notice = index_notice(notice=notice)
normalised_notice = normalise_notice(notice=indexed_notice)

return NoticePipelineOutput(notice=normalised_notice)
try:
normalised_notice = normalise_notice(notice=indexed_notice)
return NoticePipelineOutput(notice=normalised_notice)
except Exception as error_message:
return NoticePipelineOutput(notice=indexed_notice, processed=False, store_result=True,
error_message=str(error_message))


def notice_transformation_pipeline(notice: Notice, mongodb_client: MongoClient) -> NoticePipelineOutput:
Expand Down
3 changes: 2 additions & 1 deletion dags/pipelines/pipeline_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

class NoticePipelineOutput:

def __init__(self, notice: Notice, processed: bool = True, store_result: bool = True):
def __init__(self, notice: Notice, processed: bool = True, store_result: bool = True, error_message: str = None):
self.notice = notice
self.processed = processed
self.store_result = store_result
self.error_message = error_message


class NoticePipelineCallable(Protocol):
Expand Down
6 changes: 3 additions & 3 deletions infra/airflow-cluster/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/apache/airflow:2.5.1-python3.8
FROM docker.io/apache/airflow:2.7.1-python3.8

# quick sudo
USER root
Expand All @@ -14,7 +14,7 @@ COPY requirements.txt /opt/airflow
# working in the /opt/airflow
WORKDIR /opt/airflow
RUN mkdir -p ./.rmlmapper
RUN wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.1.3/rmlmapper-6.1.3-r367-all.jar -O ./.rmlmapper/rmlmapper.jar
RUN wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.1/rmlmapper-6.2.1-r368-all.jar -O ./.rmlmapper/rmlmapper.jar


RUN wget -c https://kumisystems.dl.sourceforge.net/project/saxon/Saxon-HE/10/Java/SaxonHE10-6J.zip -P .saxon/
Expand All @@ -26,4 +26,4 @@ RUN wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.ja


RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
RUN pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"
4 changes: 3 additions & 1 deletion infra/airflow-cluster/docker-compose-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ x-airflow-common:
AIRFLOW__CORE__PARALLELISM: 256
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 256
AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT: 256
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 24
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 8
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 1
AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 128
AIRFLOW__CELERY__WORKER_CONCURRENCY: ${AIRFLOW_CELERY_WORKER_CONCURRENCY}
AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024
Expand Down
2 changes: 1 addition & 1 deletion infra/airflow-cluster/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ x-airflow-common:
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 8
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 1
AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 128
AIRFLOW__CELERY__WORKER_CONCURRENCY: 16
AIRFLOW__CELERY__WORKER_CONCURRENCY: 1
AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}"
Expand Down
6 changes: 3 additions & 3 deletions infra/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/apache/airflow:2.5.1-python3.8
FROM docker.io/apache/airflow:2.7.1-python3.8

# quick sudo
USER root
Expand All @@ -14,7 +14,7 @@ COPY requirements.txt /opt/airflow
# working in the /opt/airflow
WORKDIR /opt/airflow
RUN mkdir -p ./.rmlmapper ./dags ./ted_sws ./temp
RUN wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.1.3/rmlmapper-6.1.3-r367-all.jar -O ./.rmlmapper/rmlmapper.jar
RUN wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.1/rmlmapper-6.2.1-r368-all.jar -O ./.rmlmapper/rmlmapper.jar


RUN wget -c https://kumisystems.dl.sourceforge.net/project/saxon/Saxon-HE/10/Java/SaxonHE10-6J.zip -P .saxon/
Expand All @@ -25,4 +25,4 @@ RUN wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.ja


RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.8.txt"
RUN pip install --no-cache-dir -r requirements.txt --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-no-providers-3.8.txt"
1 change: 1 addition & 0 deletions infra/airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ x-airflow-common:
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CELERY__WORKER_CONCURRENCY: 1
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
Expand Down
20 changes: 10 additions & 10 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Development libraries
coverage~=6.3.1
pytest~=7.0.0
pytest-bdd~=5.0.0
pytest-cov~=3.0.0
pytest-subtests~=0.6.0
tox~=3.24.5
allure-pytest-bdd~=2.13.2
coverage~=7.3.1
mongomock~=4.1.2
pycurl~=7.45.2
pytest~=7.4.2
pytest-bdd~=6.1.1
pytest-cov~=4.1.0
pytest-subtests~=0.11.0
tox~=4.11.3
tox-pytest-summary~=0.1.2
mongomock==4.1.2
uvicorn[standard]
allure-pytest-bdd==2.10.0
pycurl~=7.45.2
uvicorn[standard]~=0.23.2
50 changes: 25 additions & 25 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
# Project dependecies
pydantic~=1.9.0
requests~=2.28.2
deepdiff~=5.7.0
jinja2~=3.1.2
python-dotenv~=0.19.2
pymongo~=4.0.1
apache-airflow~=2.5.1
hvac==0.11.2
SPARQLWrapper==1.8.5
pandas==1.5.2
click~=8.1.0
openpyxl==3.0.9
colorama~=0.4
fastapi~=0.77
python-dateutil~=2.8.2
rdflib~=6.1.1
pyshacl~=0.19.0
agraph-python==101.0.10
agraph-python~=102.0.0
apache-airflow~=2.7.1
certifi~=2023.7.22
click~=8.1.7
colorama~=0.4.6
decorator~=5.1.1
urllib3[secure]
semantic-version==2.10.0
paramiko~=3.0.0
ordered-set~=4.0.2
deepdiff~=6.5.0
fastapi~=0.103.0
hvac~=1.2.1
jinja2~=3.1.2
json2html~=1.3.0
minio~=7.1.1
certifi~=2022.12.7
shortuuid~=1.0.11
minio~=7.1.16
openpyxl~=3.1.2
ordered-set~=4.1.0
paramiko~=3.3.1
pandas~=2.0.3
pydantic~=2.3.0
pymongo~=4.5.0
pyshacl~=0.23.0
python-dateutil~=2.8.2
python-dotenv~=1.0.0
rdflib~=6.3.2
requests~=2.31.0
semantic-version~=2.10.0
shortuuid~=1.0.11
SPARQLWrapper~=2.0.0
urllib3[secure]~=1.26.16
9 changes: 8 additions & 1 deletion ted_sws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,16 @@ def S3_PUBLISH_ENABLED(self, config_value: str) -> bool:
return config_value.lower() in ["1", "true"]


class AirflowConfig:

@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="4")
def AIRFLOW_NUMBER_OF_WORKERS(self, config_value: str) -> int:
return int(config_value)


class TedConfigResolver(MongoDBConfig, RMLMapperConfig, XMLProcessorConfig, ELKConfig, LoggingConfig,
GitHubArtefacts, API, AllegroConfig, TedAPIConfig, SFTPConfig, FusekiConfig,
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig):
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig, AirflowConfig):
"""
This class resolve the secrets of the ted-sws project.
"""
Expand Down
26 changes: 13 additions & 13 deletions ted_sws/core/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ class PropertyBaseModel(BaseModel):
https://github.com/samuelcolvin/pydantic/issues/935
is solved
"""

@classmethod
def get_properties(cls):
return [
prop for prop in dir(cls)
if isinstance(getattr(cls, prop), property) and prop not in ("__values__", "fields")
]

def dict(self, *args, **kwargs) -> 'DictStrAny':
self.__dict__.update({prop: getattr(self, prop) for prop in self.get_properties()})

return super().dict(*args, **kwargs)
#TODO : remove this code, because this implementation is not necessary anymore in pydantic 2
# @classmethod
# def get_properties(cls):
# return [
# prop for prop in dir(cls)
# if isinstance(getattr(cls, prop), property) and prop not in ("__values__", "fields")
# ]
#
#TODO : remove this code, because this implementation is not necessary anymore in pydantic 2
# def dict(self, *args, **kwargs) -> 'DictStrAny':
# self.__dict__.update({prop: getattr(self, prop) for prop in self.get_properties()})
# return super().dict(*args, **kwargs)

def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__) or not other:
return False
# raise ValueError(f"Must compare objects of the same class {self.__class__}")
difference = DeepDiff(self.dict(), other.dict())
difference = DeepDiff(self.model_dump(), other.model_dump())
return not difference

def __ne__(self, other):
Expand Down
Loading
Loading