Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OA: parsing each open access category sequentially #38

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 49 additions & 55 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.open_access.open_access import OAOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from open_access.utils import (
fetch_count_from_comments,
fetch_count_from_parsed_records,
generate_params,
get_golden_access_count,
get_green_access_count,
)
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -20,64 +20,58 @@
params={"year": 2023},
)
def oa_dag():
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query_object, **kwargs):
@task(executor_config=kubernetes_executor_config)
def fetch_closed_access_count(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"
parameters = generate_params(query_object, year)
return fetch_count_from_comments(parameters=parameters)

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}
@task(executor_config=kubernetes_executor_config)
def fetch_bronze_open_access_count(query_object, closed_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_comments(parameters=parameters, previous=closed_access)

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
def fetch_gold_open_access_count(query_object, bronze_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_parsed_records(
parameters=parameters,
count_function=get_golden_access_count,
previous=bronze_access,
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
count = get_total_results_count(response.text)
if type_of_query == "gold_open_access":
total_gold = utils.get_golden_access_count(count, endpoint)
return {parameters["type_of_query"]: total_gold}
elif type_of_query == "green_open_access":
total_green = utils.get_green_access_count(count, endpoint)
return {parameters["type_of_query"]: total_green}
return {parameters["type_of_query"]: count}

queries_objects_list = [
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
]
@task(executor_config=kubernetes_executor_config)
def fetch_green_open_access_count(query_object, gold_access, **kwargs):
year = kwargs["params"].get("year")
parameters = generate_params(query_object, year)
return fetch_count_from_parsed_records(
parameters=parameters,
count_function=get_green_access_count,
previous=gold_access,
)

parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)
closed_access = fetch_closed_access_count(
{"closed_access": constants.CLOSED_ACCESS}
)
closed_bronze_access = fetch_bronze_open_access_count(
{"bronze_open_access": constants.BRONZE_ACCESS}, closed_access
)
closed_bronze_gold_access = fetch_gold_open_access_count(
{"gold_open_access": constants.GOLD_ACCESS}, closed_bronze_access
)
closed_bronze_gold_green_access = fetch_green_open_access_count(
{"green_open_access": constants.GREEN_ACCESS}, closed_bronze_gold_access
)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join_and_add_year(counts, **kwargs):
@task(executor_config=kubernetes_executor_config)
def add_year(closed_bronze_gold_green_access, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results
closed_bronze_gold_green_access["year"] = year
return closed_bronze_gold_green_access

results = join_and_add_year(counts)
results = add_year(closed_bronze_gold_green_access)

@sqlalchemy_task(conn_id="superset")
def populate_open_access(results, session, **kwargs):
Expand Down
21 changes: 11 additions & 10 deletions dags/open_access/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ def parse_subset_green_access(records):
for record in records:
datafields_856 = record.findall("datafield[@tag='856'][@ind1='4'][@ind2=' ']")
datafields_540 = record.findall("datafield/[@tag='540']")
if datafields_856 is None:
continue
if datafields_540 is None:
continue
is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856)
is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access(
datafields_540
)
is_it_wanted_record_by_856 = None
is_it_wanted_record_by_540_preprint = None

if datafields_856:
is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856)
if datafields_540:
is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access(
datafields_540
)
is_it_wanted_record_by_540_publication = (
not is_subset_540_publication_golden_access(datafields_540)
)
Expand All @@ -104,8 +105,8 @@ def parse_subset_golden_access(records):
for record in records:
datafields_540 = record.findall("datafield/[@tag='540']")
if datafields_540 is None:
continue
if is_subset_540_publication_golden_access(datafields_540):
pass
elif is_subset_540_publication_golden_access(datafields_540):
filtered_records.append(record)
return filtered_records

Expand Down
56 changes: 55 additions & 1 deletion dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.utils import get_total_results_count
from open_access.parsers import (
get_golden_access_records_ids,
get_green_access_records_ids,
Expand All @@ -27,7 +28,8 @@ def get_count_http_hook(total, url, record_extractor):
)
all_ids.extend(record_extractor(response.text))
records_ids_count = records_ids_count + len(record_extractor(response.text))
logging.info(f"In total was found {records_ids_count} golden access records")
logging.info(f"In total was found {records_ids_count} records")
logging.info(f"Records ids: {all_ids}")
return records_ids_count


Expand All @@ -46,3 +48,55 @@ def get_url(query, current_collection="Published+Articles"):
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
return url


def fetch_count_from_comments(parameters, previous={}):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
previous.update({parameters["type_of_query"]: count})
return previous


def fetch_count_from_parsed_records(
parameters,
count_function,
previous={},
):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
count = count_function(total, endpoint)
previous.update({parameters["type_of_query"]: count})
return previous


def generate_params(query_object, year):
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}
Loading