diff --git a/dags/common/models/library/library_cern_publication_records.py b/dags/common/models/library/library_cern_publication_records.py new file mode 100644 index 0000000..88fe8ee --- /dev/null +++ b/dags/common/models/library/library_cern_publication_records.py @@ -0,0 +1,15 @@ +from sqlalchemy import Column, DateTime, Float, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class LibraryCernPublicationRecords(Base): + __tablename__ = "library_cern_publication_records" + + year = Column(Integer, primary_key=True) + publications_total_count = Column(Float) + conference_proceedings_count = Column(Float) + non_journal_proceedings_count = Column(Float) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/common/models/library/library_new_items_in_the_institutional_repository.py b/dags/common/models/library/library_new_items_in_the_institutional_repository.py new file mode 100644 index 0000000..bece01a --- /dev/null +++ b/dags/common/models/library/library_new_items_in_the_institutional_repository.py @@ -0,0 +1,14 @@ +from sqlalchemy import Column, DateTime, Float, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class LibraryNewItemsInTheInstitutionalRepository(Base): + __tablename__ = "library_items_in_the_institutional_repository" + + year = Column(Integer, primary_key=True) + inspire_arxiv_records = Column(Float) + inspire_curators_records = Column(Float) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/common/models/open_access/oa_golden_open_access.py b/dags/common/models/open_access/oa_golden_open_access.py new file mode 100644 index 0000000..97ff9e0 --- /dev/null +++ b/dags/common/models/open_access/oa_golden_open_access.py @@ -0,0 +1,17 @@ +from sqlalchemy import Column, DateTime, Float, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class OAGoldenOpenAccess(Base): + __tablename__ = "oa_golden_open_access" + + year = Column(Integer, primary_key=True) + cern_read_and_publish = Column(Float) + cern_individual_apcs = Column(Float) + scoap3 = Column(Float) + other = Column(Float) + other_collective_models = Column(Float) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/common/models/open_access/open_access.py b/dags/common/models/open_access/open_access.py new file mode 100644 index 0000000..fc3fc48 --- /dev/null +++ b/dags/common/models/open_access/open_access.py @@ -0,0 +1,16 @@ +from sqlalchemy import Column, DateTime, Float, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class OAOpenAccess(Base): + __tablename__ = "oa_open_access" + + year = Column(Integer, primary_key=True) + closed_access = Column(Float) + bronze_open_access = Column(Float) + green_open_access = Column(Float) + gold_open_access = Column(Float) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/common/operators/sqlalchemy_operator.py b/dags/common/operators/sqlalchemy_operator.py new file mode 100644 index 0000000..a247b02 --- /dev/null +++ b/dags/common/operators/sqlalchemy_operator.py @@ -0,0 +1,31 @@ +from airflow.decorators import task +from airflow.hooks.postgres_hook import PostgresHook +from executor_config import kubernetes_executor_config +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import sessionmaker + + +def get_session(conn_id: str): + hook = PostgresHook(postgres_conn_id=conn_id) + engine = hook.get_sqlalchemy_engine() + return sessionmaker(bind=engine)() + + +def sqlalchemy_task(conn_id: str): + def decorator(func): + @task(executor_config=kubernetes_executor_config) + def wrapper(*args, **kwargs): + session = get_session(conn_id) + try: + result = func(*args, session=session, **kwargs) + session.commit() + return result + except SQLAlchemyError as e: + session.rollback() + raise e + finally: + session.close() + + return wrapper + + return decorator diff --git a/dags/library/cern_publication_records.py b/dags/library/cern_publication_records.py index 9eedeed..6be4fb2 100644 --- a/dags/library/cern_publication_records.py +++ b/dags/library/cern_publication_records.py @@ -4,10 +4,14 @@ import pendulum from airflow.decorators import dag, task -from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.models.library.library_cern_publication_records import ( + LibraryCernPublicationRecords, +) +from common.operators.sqlalchemy_operator import sqlalchemy_task from common.utils import get_total_results_count, request_again_if_failed from executor_config import kubernetes_executor_config from library.utils import get_url +from sqlalchemy.sql import func @dag( @@ -31,7 +35,7 @@ def fetch_data_task(key, **kwargs): @task(multiple_outputs=True, executor_config=kubernetes_executor_config) def join(values, **kwargs): results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") + results["year"] = kwargs["params"].get("year") return results results = fetch_data_task.expand( @@ -43,35 +47,32 @@ def join(values, **kwargs): ) unpacked_results = join(results) - PostgresOperator( - task_id="populate_library_cern_publication_records_table", - postgres_conn_id="superset", - sql=""" - INSERT INTO library_cern_publication_records (year, - publications_total_count, conference_proceedings_count, - non_journal_proceedings_count, created_at, updated_at) - VALUES (%(years)s, %(publications_total_count)s, - %(conference_proceedings_count)s, %(non_journal_proceedings_count)s, - CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (year) - DO UPDATE SET - publications_total_count = EXCLUDED.publications_total_count, - conference_proceedings_count = EXCLUDED.conference_proceedings_count, - non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count, - updated_at = CURRENT_TIMESTAMP; - """, - parameters={ - "years": unpacked_results["years"], - "publications_total_count": unpacked_results["publications_total_count"], - "conference_proceedings_count": unpacked_results[ + @sqlalchemy_task(conn_id="superset_qa") + def populate_cern_publication_records(results, session, **kwargs): + record = ( + session.query(LibraryCernPublicationRecords) + .filter_by(year=results["year"]) + .first() + ) + if record: + record.publications_total_count = results["publications_total_count"] + record.conference_proceedings_count = results[ "conference_proceedings_count" - ], - "non_journal_proceedings_count": unpacked_results[ + ] + record.non_journal_proceedings_count = results[ "non_journal_proceedings_count" - ], - }, - executor_config=kubernetes_executor_config, - ) + ] + record.updated_at = func.now() + else: + new_record = LibraryCernPublicationRecords( + year=results["year"], + publications_total_count=results["publications_total_count"], + conference_proceedings_count=results["conference_proceedings_count"], + non_journal_proceedings_count=results["non_journal_proceedings_count"], + ) + session.add(new_record) + + populate_cern_publication_records(unpacked_results) library_cern_publication_records = library_cern_publication_records_dag() diff --git a/dags/library/new_items_in_the_institutional_repository.py b/dags/library/new_items_in_the_institutional_repository.py index 5d85c9a..134cf97 100644 --- a/dags/library/new_items_in_the_institutional_repository.py +++ b/dags/library/new_items_in_the_institutional_repository.py @@ -4,9 +4,13 @@ import pendulum from airflow.decorators import dag, task from airflow.providers.http.hooks.http import HttpHook -from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.models.library.library_new_items_in_the_institutional_repository import ( + LibraryNewItemsInTheInstitutionalRepository, +) +from common.operators.sqlalchemy_operator import sqlalchemy_task from common.utils import get_total_results_count from executor_config import kubernetes_executor_config +from sqlalchemy.sql import func from tenacity import retry_if_exception_type, stop_after_attempt @@ -56,25 +60,26 @@ def join_and_add_year(counts, **kwargs): results = join_and_add_year(counts) - populate_items_in_the_institutional_repository = PostgresOperator( - task_id="populate_items_in_the_institutional_repository", - postgres_conn_id="superset_qa", - sql=""" - INSERT INTO items_in_the_institutional_repository (year, - inspire_arxiv_records, inspire_curators_records, created_at, updated_at) - VALUES (%(year)s, %(inspire_arxiv_records)s, %(inspire_curators_records)s, - CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (year) - DO UPDATE SET - inspire_arxiv_records = EXCLUDED.inspire_arxiv_records, - inspire_curators_records = EXCLUDED.inspire_curators_records, - updated_at = CURRENT_TIMESTAMP; - """, - parameters=results, - executor_config=kubernetes_executor_config, - ) + @sqlalchemy_task(conn_id="superset_qa") + def populate_new_items_in_the_institutional_repository(results, session, **kwargs): + record = ( + session.query(LibraryNewItemsInTheInstitutionalRepository) + .filter_by(year=results["year"]) + .first() + ) + if record: + record.inspire_arxiv_records = results["inspire_arxiv_records"] + record.inspire_curators_records = results["inspire_curators_records"] + record.updated_at = func.now() + else: + new_record = LibraryNewItemsInTheInstitutionalRepository( + year=results["year"], + inspire_arxiv_records=results["inspire_arxiv_records"], + inspire_curators_records=results["inspire_curators_records"], + ) + session.add(new_record) - counts >> results >> populate_items_in_the_institutional_repository + populate_new_items_in_the_institutional_repository(results) Library_new_items_in_the_institutional_repository = ( diff --git a/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py b/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py index c31da7d..9c009d8 100644 --- a/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py +++ b/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py @@ -19,7 +19,7 @@ def upgrade(): op.create_table( - "items_in_the_institutional_repository", + "library_items_in_the_institutional_repository", sa.Column("year", sa.Integer, primary_key=True), sa.Column("inspire_arxiv_records", sa.Integer, nullable=False), sa.Column("inspire_curators_records", sa.Integer, nullable=False), @@ -29,4 +29,4 @@ def upgrade(): def downgrade(): - op.drop_table("items_in_the_institutional_repository") + op.drop_table("library_items_in_the_institutional_repository") diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index f300b42..f11d651 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -4,9 +4,11 @@ import pendulum from airflow.decorators import dag, task from airflow.providers.http.hooks.http import HttpHook -from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess +from common.operators.sqlalchemy_operator import sqlalchemy_task from common.utils import get_total_results_count from executor_config import kubernetes_executor_config +from sqlalchemy.sql import func from tenacity import retry_if_exception_type, stop_after_attempt @@ -68,29 +70,30 @@ def join_and_add_year(counts, **kwargs): results = join_and_add_year(counts) - populate_golden_open_access = PostgresOperator( - task_id="populate_golden_open_access", - postgres_conn_id="superset", - sql=""" - INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs, - scoap3, other, other_collective_models, created_at, updated_at) - VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s, - %(scoap3)s, %(other)s, %(other_collective_models)s, - CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (year) - DO UPDATE SET - cern_read_and_publish = EXCLUDED.cern_read_and_publish, - cern_individual_apcs = EXCLUDED.cern_individual_apcs, - scoap3 = EXCLUDED.scoap3, - other = EXCLUDED.other, - other_collective_models = EXCLUDED.other_collective_models, - updated_at = CURRENT_TIMESTAMP; - """, - parameters=results, - executor_config=kubernetes_executor_config, - ) + @sqlalchemy_task(conn_id="superset_qa") + def populate_golden_open_access(results, session, **kwargs): + record = ( + session.query(OAGoldenOpenAccess).filter_by(year=results["year"]).first() + ) + if record: + record.cern_read_and_publish = results["cern_read_and_publish"] + record.cern_individual_apcs = results["cern_individual_apcs"] + record.scoap3 = results["scoap3"] + record.other = results["other"] + record.other_collective_models = results["other_collective_models"] + record.updated_at = func.now() + else: + new_record = OAGoldenOpenAccess( + year=results["year"], + cern_read_and_publish=results["cern_read_and_publish"], + cern_individual_apcs=results["cern_individual_apcs"], + scoap3=results["scoap3"], + other=results["other"], + other_collective_models=results["other_collective_models"], + ) + session.add(new_record) - counts >> results >> populate_golden_open_access + populate_golden_open_access(results) OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms() diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index bdf7c1f..b3fb66c 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -4,9 +4,11 @@ import open_access.utils as utils import pendulum from airflow.decorators import dag, task -from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.models.open_access.open_access import OAOpenAccess +from common.operators.sqlalchemy_operator import sqlalchemy_task from common.utils import get_total_results_count, request_again_if_failed from executor_config import kubernetes_executor_config +from sqlalchemy.sql import func @dag( @@ -36,44 +38,39 @@ def fetch_data_task(query, **kwargs): @task(multiple_outputs=True, executor_config=kubernetes_executor_config) def join(values, **kwargs): results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") + results["year"] = kwargs["params"].get("year") return results results = fetch_data_task.expand( query=[ - {"closed": constants.CLOSED_ACCESS}, - {"bronze": constants.BRONZE_ACCESS}, - {"green": constants.GREEN_ACCESS}, - {"gold": constants.GOLD_ACCESS}, + {"closed_access": constants.CLOSED_ACCESS}, + {"bronze_open_access": constants.BRONZE_ACCESS}, + {"green_open_access": constants.GREEN_ACCESS}, + {"gold_open_access": constants.GOLD_ACCESS}, ], ) unpacked_results = join(results) - PostgresOperator( - task_id="populate_open_access_table", - postgres_conn_id="superset", - sql=""" - INSERT INTO oa_open_access (year, closed_access, bronze_open_access, - green_open_access, gold_open_access, created_at, updated_at) - VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s, - CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (year) - DO UPDATE SET - closed_access = EXCLUDED.closed_access, - bronze_open_access = EXCLUDED.bronze_open_access, - green_open_access = EXCLUDED.green_open_access, - gold_open_access = EXCLUDED.gold_open_access, - updated_at = CURRENT_TIMESTAMP; - """, - parameters={ - "years": unpacked_results["years"], - "closed": unpacked_results["closed"], - "bronze": unpacked_results["bronze"], - "green": unpacked_results["green"], - "gold": unpacked_results["gold"], - }, - executor_config=kubernetes_executor_config, - ) + @sqlalchemy_task(conn_id="superset_qa") + def populate_open_access(results, session, **kwargs): + record = session.query(OAOpenAccess).filter_by(year=results["year"]).first() + if record: + record.closed_access = results["closed_access"] + record.bronze_open_access = results["bronze_open_access"] + record.green_open_access = results["green_open_access"] + record.gold_open_access = results["gold_open_access"] + record.updated_at = func.now() + else: + new_record = OAOpenAccess( + year=results["year"], + closed_access=results["closed_access"], + bronze_open_access=results["bronze_open_access"], + green_open_access=results["green_open_access"], + gold_open_access=results["gold_open_access"], + ) + session.add(new_record) + + populate_open_access(unpacked_results) OA_dag = oa_dag()