From 1b66dc78c138a0a1ab87c0664d40a6935beb909d Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Thu, 1 Aug 2024 17:47:04 +0200 Subject: [PATCH] OA: parsing each open access category sequentially --- dags/open_access/open_access.py | 104 +++++++++++++++----------------- dags/open_access/parsers.py | 21 ++++--- dags/open_access/utils.py | 56 ++++++++++++++++- 3 files changed, 115 insertions(+), 66 deletions(-) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index dc9ac5d..532fc59 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -1,17 +1,17 @@ -from functools import reduce - import open_access.constants as constants -import open_access.utils as utils import pendulum from airflow.decorators import dag, task -from airflow.exceptions import AirflowException -from airflow.providers.http.hooks.http import HttpHook from common.models.open_access.open_access import OAOpenAccess from common.operators.sqlalchemy_operator import sqlalchemy_task -from common.utils import get_total_results_count from executor_config import kubernetes_executor_config +from open_access.utils import ( + fetch_count_from_comments, + fetch_count_from_parsed_records, + generate_params, + get_golden_access_count, + get_green_access_count, +) from sqlalchemy.sql import func -from tenacity import retry_if_exception_type, stop_after_attempt @dag( @@ -20,64 +20,58 @@ params={"year": 2023}, ) def oa_dag(): - @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def generate_params(query_object, **kwargs): + @task(executor_config=kubernetes_executor_config) + def fetch_closed_access_count(query_object, **kwargs): year = kwargs["params"].get("year") - current_collection = "Published+Articles" - base_query = ( - r"(affiliation:CERN+or+595:'For+annual+report')" - + rf"and+year:{year}+not+980:ConferencePaper+" - + r"not+980:BookChapter" - ) - type_of_query = [*query_object][0] - query = rf"{base_query}+{query_object[type_of_query]}" + parameters = generate_params(query_object, year) + return fetch_count_from_comments(parameters=parameters) - return { - "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" - + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" - + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", - "type_of_query": type_of_query, - } + @task(executor_config=kubernetes_executor_config) + def fetch_bronze_open_access_count(query_object, closed_access, **kwargs): + year = kwargs["params"].get("year") + parameters = generate_params(query_object, year) + return fetch_count_from_comments(parameters=parameters, previous=closed_access) @task(executor_config=kubernetes_executor_config) - def fetch_count(parameters): - http_hook = HttpHook(http_conn_id="cds", method="GET") - response = http_hook.run_with_advanced_retry( - endpoint=parameters["endpoint"], - _retry_args={ - "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(AirflowException), - }, + def fetch_gold_open_access_count(query_object, bronze_access, **kwargs): + year = kwargs["params"].get("year") + parameters = generate_params(query_object, year) + return fetch_count_from_parsed_records( + parameters=parameters, + count_function=get_golden_access_count, + previous=bronze_access, ) - type_of_query = parameters["type_of_query"] - endpoint = parameters["endpoint"] - count = get_total_results_count(response.text) - if type_of_query == "gold_open_access": - total_gold = utils.get_golden_access_count(count, endpoint) - return {parameters["type_of_query"]: total_gold} - elif type_of_query == "green_open_access": - total_green = utils.get_green_access_count(count, endpoint) - return {parameters["type_of_query"]: total_green} - return {parameters["type_of_query"]: count} - queries_objects_list = [ - {"closed_access": constants.CLOSED_ACCESS}, - {"bronze_open_access": constants.BRONZE_ACCESS}, - {"green_open_access": constants.GREEN_ACCESS}, - {"gold_open_access": constants.GOLD_ACCESS}, - ] + @task(executor_config=kubernetes_executor_config) + def fetch_green_open_access_count(query_object, gold_access, **kwargs): + year = kwargs["params"].get("year") + parameters = generate_params(query_object, year) + return fetch_count_from_parsed_records( + parameters=parameters, + count_function=get_green_access_count, + previous=gold_access, + ) - parameters = generate_params.expand(query_object=queries_objects_list) - counts = fetch_count.expand(parameters=parameters) + closed_access = fetch_closed_access_count( + {"closed_access": constants.CLOSED_ACCESS} + ) + closed_bronze_access = fetch_bronze_open_access_count( + {"bronze_open_access": constants.BRONZE_ACCESS}, closed_access + ) + closed_bronze_gold_access = fetch_gold_open_access_count( + {"gold_open_access": constants.GOLD_ACCESS}, closed_bronze_access + ) + closed_bronze_gold_green_access = fetch_green_open_access_count( + {"green_open_access": constants.GREEN_ACCESS}, closed_bronze_gold_access + ) - @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join_and_add_year(counts, **kwargs): + @task(executor_config=kubernetes_executor_config) + def add_year(closed_bronze_gold_green_access, **kwargs): year = kwargs["params"].get("year") - results = reduce(lambda a, b: {**a, **b}, counts) - results["year"] = year - return results + closed_bronze_gold_green_access["year"] = year + return closed_bronze_gold_green_access - results = join_and_add_year(counts) + results = add_year(closed_bronze_gold_green_access) @sqlalchemy_task(conn_id="superset") def populate_open_access(results, session, **kwargs): diff --git a/dags/open_access/parsers.py b/dags/open_access/parsers.py index 5b82694..46faea1 100644 --- a/dags/open_access/parsers.py +++ b/dags/open_access/parsers.py @@ -78,14 +78,15 @@ def parse_subset_green_access(records): for record in records: datafields_856 = record.findall("datafield[@tag='856'][@ind1='4'][@ind2=' ']") datafields_540 = record.findall("datafield/[@tag='540']") - if datafields_856 is None: - continue - if datafields_540 is None: - continue - is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856) - is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access( - datafields_540 - ) + is_it_wanted_record_by_856 = None + is_it_wanted_record_by_540_preprint = None + + if datafields_856: + is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856) + if datafields_540: + is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access( + datafields_540 + ) is_it_wanted_record_by_540_publication = ( not is_subset_540_publication_golden_access(datafields_540) ) @@ -104,8 +105,8 @@ def parse_subset_golden_access(records): for record in records: datafields_540 = record.findall("datafield/[@tag='540']") if datafields_540 is None: - continue - if is_subset_540_publication_golden_access(datafields_540): + pass + elif is_subset_540_publication_golden_access(datafields_540): filtered_records.append(record) return filtered_records diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 007f9f3..2c70eb3 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -3,6 +3,7 @@ from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook +from common.utils import get_total_results_count from open_access.parsers import ( get_golden_access_records_ids, get_green_access_records_ids, @@ -27,7 +28,8 @@ def get_count_http_hook(total, url, record_extractor): ) all_ids.extend(record_extractor(response.text)) records_ids_count = records_ids_count + len(record_extractor(response.text)) - logging.info(f"In total was found {records_ids_count} golden access records") + logging.info(f"In total was found {records_ids_count} records") + logging.info(f"Records ids: {all_ids}") return records_ids_count @@ -46,3 +48,55 @@ def get_url(query, current_collection="Published+Articles"): + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm" ) return url + + +def fetch_count_from_comments(parameters, previous={}): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + count = get_total_results_count(response.text) + previous.update({parameters["type_of_query"]: count}) + return previous + + +def fetch_count_from_parsed_records( + parameters, + count_function, + previous={}, +): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + endpoint = parameters["endpoint"] + total = get_total_results_count(response.text) + count = count_function(total, endpoint) + previous.update({parameters["type_of_query"]: count}) + return previous + + +def generate_params(query_object, year): + current_collection = "Published+Articles" + base_query = ( + r"(affiliation:CERN+or+595:'For+annual+report')" + + rf"and+year:{year}+not+980:ConferencePaper+" + + r"not+980:BookChapter" + ) + type_of_query = [*query_object][0] + query = rf"{base_query}+{query_object[type_of_query]}" + + return { + "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", + "type_of_query": type_of_query, + }