Skip to content

Commit

Permalink
DAGs: migration fro requests lib to http hook
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 9, 2024
1 parent 33bd15e commit d6eb5a1
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 91 deletions.
5 changes: 0 additions & 5 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ def __init__(self, input, current_year):
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
Expand Down
18 changes: 1 addition & 17 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
import datetime
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response
from common.exceptions import NotFoundTotalCountOfRecords, WrongInput


def get_total_results_count(data):
Expand Down
76 changes: 42 additions & 34 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import os
from functools import reduce

import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from library.utils import get_url
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -16,41 +16,53 @@
params={"year": 2023},
)
def library_cern_publication_records_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(key, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
type_of_query = key
url = get_url(type_of_query, year)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
return {type_of_query: total}
url = get_url(key, year)
return {
"endpoint": url,
"type_of_query": key,
}

@task
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

keys_list = [
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
]

parameters = generate_params.expand(key=keys_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
key=[
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
],
)
unpacked_results = join(results)

results = join_and_add_year(counts)
PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
VALUES (%(year)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
Expand All @@ -61,14 +73,10 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
"non_journal_proceedings_count"
],
"year": results["year"],
"publications_total_count": results["publications_total_count"],
"conference_proceedings_count": results["conference_proceedings_count"],
"non_journal_proceedings_count": results["non_journal_proceedings_count"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
6 changes: 3 additions & 3 deletions dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
def get_url(key, year):
url = {
"publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
"publications_total_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
+ rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+"
+ r"for+annual+report%27&f=&action_search=Search&of=xm",
"conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
"conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+"
+ rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+"
+ r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm",
"non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+"
"non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+"
+ rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+"
+ r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm",
Expand Down
80 changes: 53 additions & 27 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -15,47 +17,71 @@
params={"year": 2023},
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}")
data = request_again_if_failed(url=url)
total = get_total_results_count(data.text)
query_p = rf"{base_query}+{query[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
if type_of_query == "gold":
total = utils.get_golden_access_count(total, url)
total = utils.get_golden_access_count(total, endpoint)
if type_of_query == "green":
total = utils.get_green_access_count(total, url)
return {type_of_query: total}
total = utils.get_green_access_count(total, endpoint)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
]

parameters = generate_params.expand(query=query_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
query=[
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO oa_open_access (year, closed_access, bronze_open_access,
green_open_access, gold_open_access, created_at, updated_at)
VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
Expand All @@ -66,11 +92,11 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"closed": unpacked_results["closed"],
"bronze": unpacked_results["bronze"],
"green": unpacked_results["green"],
"gold": unpacked_results["gold"],
"year": results["year"],
"closed": results["closed"],
"bronze": results["bronze"],
"green": results["green"],
"gold": results["gold"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
18 changes: 13 additions & 5 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
import logging
import math

from common.utils import request_again_if_failed
from airflow.providers.http.hooks.http import HttpHook
from open_access.parsers import (
get_golden_access_records_ids,
get_green_access_records_ids,
)
from tenacity import retry_if_exception_type, stop_after_attempt


def get_count(total, url, record_extractor):
def get_count_http_hook(total, url, record_extractor):
http_hook = HttpHook(http_conn_id="cds", method="GET")
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
response = http_hook.run_with_advanced_retry(
endpoint=full_url,
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
records_ids_count = records_ids_count + len(record_extractor(response.text))
logging.info(f"In total was found {records_ids_count} golden access records")
return records_ids_count


def get_golden_access_count(total, url):
return get_count(total, url, get_golden_access_records_ids)
return get_count_http_hook(total, url, get_golden_access_records_ids)


def get_green_access_count(total, url):
return get_count(total, url, get_green_access_records_ids)
return get_count_http_hook(total, url, get_green_access_records_ids)


def get_url(query, current_collection="Published+Articles"):
Expand Down

0 comments on commit d6eb5a1

Please sign in to comment.