Skip to content

Commit

Permalink
OA: add gold access parser
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 25, 2024
1 parent 8bd2528 commit 3d1359b
Show file tree
Hide file tree
Showing 10 changed files with 21,261 additions and 49 deletions.
24 changes: 24 additions & 0 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class WrongInput(Exception):
def __init__(self, input, current_year):
super().__init__(
f"Wrong input. Input should be digits and in range of 2004 to {current_year}: {input}"
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
):
super().__init__("Total count of records is not found!")


class TypeDoesNotExist(Exception):
def __init__(self, type_string, all_types):
super().__init__(
f"{type_string} this type does not exist, Available types: {all_types}"
)
15 changes: 15 additions & 0 deletions dags/open_access/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CLOSED_ACCESS = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint"
)
BRONZE_ACCESS = r"540__f:'Bronze'"
GREEN_ACCESS = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
)
GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"

CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP"
CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'"
SCOAP3 = r"540__f:'SCOAP3'"
OTHER = r"540__f:'Other'"
OTHER_COLLECTIVE_MODELS = r"540__f:'Collective'"
13 changes: 7 additions & 6 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
Expand All @@ -23,7 +24,7 @@ def fetch_data_task(query, **kwargs):
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = utils.get_data(url)
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
return {type_of_query: total}

Expand All @@ -35,11 +36,11 @@ def join(values, **kwargs):

results = fetch_data_task.expand(
query=[
{"cern_read_and_publish": utils.cern_read_and_publish},
{"cern_individual_apcs": utils.cern_individual_apcs},
{"scoap3": utils.scoap3},
{"other": utils.other},
{"other_collective_models": utils.other_collective_models},
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
{"scoap3": constants.SCOAP3},
{"other": constants.OTHER},
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
],
)
unpacked_results = join(results)
Expand Down
24 changes: 17 additions & 7 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
Expand All @@ -16,15 +18,24 @@ def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
cds_token = None # os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(f"{base_query}+{query[type_of_query]}")
data = utils.get_data(url)
url = utils.get_url(
query=f"{base_query}+{query[type_of_query]}", cds_token=cds_token
)
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand All @@ -35,13 +46,12 @@ def join(values, **kwargs):

results = fetch_data_task.expand(
query=[
{"closed": utils.closed_access_query},
{"bronze": utils.bronze_access_query},
{"green": utils.green_access_query},
{"gold": utils.gold_access_query},
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
],
)

unpacked_results = join(results)

PostgresOperator(
Expand Down
37 changes: 37 additions & 0 deletions dags/open_access/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import xml.etree.ElementTree as ET
from io import StringIO


def parse_without_names_spaces(xml):
if type(xml) == str:
it = ET.iterparse(StringIO(xml))
else:
it = ET.iterparse(StringIO(xml.getvalue().decode("utf-8")))
for _, el in it:
el.tag = el.tag.rpartition("}")[-1]
root = it.root
return root


def get_golden_access_records_ids(data):
xml = parse_without_names_spaces(data)
records = xml.findall(".record")
golden_access = []
for record in records:
datafields = record.findall("datafield/[@tag='540']")
if datafields is None:
continue
for datafield in datafields:
record_type = datafield.find("subfield/[@code='3']")
license = datafield.find("subfield/[@code='a']")
if record_type is not None and license is not None:
if (
"CC" in license.text
and "BY" in license.text
and record_type.text == "publication"
):
record_id = record.find("controlfield/[@tag='001']")
if record_id is not None:
doi = record_id.text
golden_access.append(doi)
return golden_access
83 changes: 47 additions & 36 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import datetime
import math
import re

import backoff
import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput
from open_access.parsers import get_golden_access_records_ids


def get_url(query, current_collection="Published+Articles"):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
return url
def request_again_if_failed(url):
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response


def get_total_results_count(data):
Expand All @@ -21,32 +27,37 @@ def get_total_results_count(data):
match = TOTAL_RECORDS_COUNT.search(comment_line)
try:
total_records_count = match.group(1)
return total_records_count
return int(total_records_count)
except AttributeError:
return 0


closed_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint"
)
bronze_access_query = r"540__f:'Bronze'"
green_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
)
gold_access_query = (
r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"
)

cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
scoap3 = r"540__f:'SCOAP3'"
other = r"540__f:'Other'"
other_collective_models = r"540__f:'Collective'"


@backoff.on_exception(
backoff.expo, requests.exceptions.ProxyError, max_time=120, max_tries=5
)
def get_data(url):
return requests.get(url)
raise NotFoundTotalCountOfRecords


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)


def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
records_ids_count = records_ids_count + len(
get_golden_access_records_ids(response.text)
)
return records_ids_count


def get_url(query, current_collection="Published+Articles", cds_token=None):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
url = url + (rf"&apikey={cds_token}" if cds_token else "")
return url
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pre-commit==3.6.2
pytest==7.4.4
coverage==7.4.3
pytest-cov==4.1.0
pytest-datadir==1.5.0
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.3
alembic==1.13.1
airflow-provider-alembic==1.0.0
elementpath==4.4.0
Loading

0 comments on commit 3d1359b

Please sign in to comment.