Skip to content

Commit

Permalink
OA: retry 10 times to harvest data from CDS
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 24, 2024
1 parent 91013ed commit 06f5946
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
10 changes: 10 additions & 0 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class WrongInput(Exception):
def __init__(self, input, current_year):
super().__init__(
f"Wrong input. Input should be digits and in range of 2004 to {current_year}: {input}"
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")
11 changes: 9 additions & 2 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.exceptions import DataFetchError
from executor_config import kubernetes_executor_config


Expand All @@ -23,8 +24,14 @@ def fetch_data_task(query, **kwargs):
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = utils.get_data(url)
total = utils.get_total_results_count(data.text)
response = utils.get_data(url)
count = 1
while response.status_code == 502 and count != 10:
count = count + 1
response = utils.get_data(url)
if response.status_code != 200:
raise DataFetchError(url, response.status_code)
total = utils.get_total_results_count(response.text)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
12 changes: 11 additions & 1 deletion dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
import math
import re

import backoff
import requests
from common.exceptions import WrongInput
from open_access.parsers import get_golden_access_records_ids


Expand Down Expand Up @@ -62,7 +64,15 @@ def get_total_results_count(data):


@backoff.on_exception(
backoff.expo, requests.exceptions.ProxyError, max_time=120, max_tries=5
backoff.expo, requests.exceptions.ProxyError, max_time=1000, max_tries=10
)
def get_data(url):
return requests.get(url)


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)

0 comments on commit 06f5946

Please sign in to comment.