Skip to content

Commit

Permalink
OA: parsing each open access category sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Aug 1, 2024
1 parent 0554a08 commit 1b66dc7
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 66 deletions.
104 changes: 49 additions & 55 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.open_access.open_access import OAOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from open_access.utils import (
fetch_count_from_comments,
fetch_count_from_parsed_records,
generate_params,
get_golden_access_count,
get_green_access_count,
)
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -20,64 +20,58 @@
params={"year": 2023},
)
def oa_dag():
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query_object, **kwargs):
@task(executor_config=kubernetes_executor_config)
def fetch_closed_access_count(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"
parameters = generate_params(query_object, year)
return fetch_count_from_comments(parameters=parameters)

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}
@task(executor_config=kubernetes_executor_config)
def fetch_bronze_open_access_count(query_object, closed_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_comments(parameters=parameters, previous=closed_access)

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
def fetch_gold_open_access_count(query_object, bronze_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_parsed_records(
parameters=parameters,
count_function=get_golden_access_count,
previous=bronze_access,
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
count = get_total_results_count(response.text)
if type_of_query == "gold_open_access":
total_gold = utils.get_golden_access_count(count, endpoint)
return {parameters["type_of_query"]: total_gold}
elif type_of_query == "green_open_access":
total_green = utils.get_green_access_count(count, endpoint)
return {parameters["type_of_query"]: total_green}
return {parameters["type_of_query"]: count}

queries_objects_list = [
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
]
@task(executor_config=kubernetes_executor_config)
def fetch_green_open_access_count(query_object, gold_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_parsed_records(
parameters=parameters,
count_function=get_green_access_count,
previous=gold_access,
)

parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)
closed_access = fetch_closed_access_count(
{"closed_access": constants.CLOSED_ACCESS}
)
closed_bronze_access = fetch_bronze_open_access_count(
{"bronze_open_access": constants.BRONZE_ACCESS}, closed_access
)
closed_bronze_gold_access = fetch_gold_open_access_count(
{"gold_open_access": constants.GOLD_ACCESS}, closed_bronze_access
)
closed_bronze_gold_green_access = fetch_green_open_access_count(
{"green_open_access": constants.GREEN_ACCESS}, closed_bronze_gold_access
)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join_and_add_year(counts, **kwargs):
@task(executor_config=kubernetes_executor_config)
def add_year(closed_bronze_gold_green_access, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results
closed_bronze_gold_green_access["year"] = year
return closed_bronze_gold_green_access

results = join_and_add_year(counts)
results = add_year(closed_bronze_gold_green_access)

@sqlalchemy_task(conn_id="superset")
def populate_open_access(results, session, **kwargs):
Expand Down
21 changes: 11 additions & 10 deletions dags/open_access/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ def parse_subset_green_access(records):
for record in records:
datafields_856 = record.findall("datafield[@tag='856'][@ind1='4'][@ind2=' ']")
datafields_540 = record.findall("datafield/[@tag='540']")
if datafields_856 is None:
continue
if datafields_540 is None:
continue
is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856)
is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access(
datafields_540
)
is_it_wanted_record_by_856 = None
is_it_wanted_record_by_540_preprint = None

if datafields_856:
is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856)
if datafields_540:
is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access(
datafields_540
)
is_it_wanted_record_by_540_publication = (
not is_subset_540_publication_golden_access(datafields_540)
)
Expand All @@ -104,8 +105,8 @@ def parse_subset_golden_access(records):
for record in records:
datafields_540 = record.findall("datafield/[@tag='540']")
if datafields_540 is None:
continue
if is_subset_540_publication_golden_access(datafields_540):
pass
elif is_subset_540_publication_golden_access(datafields_540):
filtered_records.append(record)
return filtered_records

Expand Down
56 changes: 55 additions & 1 deletion dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.utils import get_total_results_count
from open_access.parsers import (
get_golden_access_records_ids,
get_green_access_records_ids,
Expand All @@ -27,7 +28,8 @@ def get_count_http_hook(total, url, record_extractor):
)
all_ids.extend(record_extractor(response.text))
records_ids_count = records_ids_count + len(record_extractor(response.text))
logging.info(f"In total was found {records_ids_count} golden access records")
logging.info(f"In total was found {records_ids_count} records")
logging.info(f"Records ids: {all_ids}")
return records_ids_count


Expand All @@ -46,3 +48,55 @@ def get_url(query, current_collection="Published+Articles"):
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
return url


def fetch_count_from_comments(parameters, previous={}):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
previous.update({parameters["type_of_query"]: count})
return previous


def fetch_count_from_parsed_records(
parameters,
count_function,
previous={},
):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
count = count_function(total, endpoint)
previous.update({parameters["type_of_query"]: count})
return previous


def generate_params(query_object, year):
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

0 comments on commit 1b66dc7

Please sign in to comment.