From 06f59469bb143dc9be3e6ece8987d4e95c85428f Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Wed, 24 Apr 2024 11:56:55 +0200 Subject: [PATCH] OA: retry 10 times to harvest data from CDS --- dags/common/exceptions.py | 10 ++++++++++ dags/open_access/gold_open_access_mechanisms.py | 11 +++++++++-- dags/open_access/utils.py | 12 +++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 dags/common/exceptions.py diff --git a/dags/common/exceptions.py b/dags/common/exceptions.py new file mode 100644 index 0000000..199a119 --- /dev/null +++ b/dags/common/exceptions.py @@ -0,0 +1,10 @@ +class WrongInput(Exception): + def __init__(self, input, current_year): + super().__init__( + f"Wrong input. Input should be digits and in range of 2004 to {current_year}: {input}" + ) + + +class DataFetchError(Exception): + def __init__(self, status_code, url): + super().__init__(f"Data fetch failure, status_code={status_code}, url={url}") diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index 3507a8e..c008b01 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -4,6 +4,7 @@ import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.exceptions import DataFetchError from executor_config import kubernetes_executor_config @@ -23,8 +24,14 @@ def fetch_data_task(query, **kwargs): ) type_of_query = [*query][0] url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}") - data = utils.get_data(url) - total = utils.get_total_results_count(data.text) + response = utils.get_data(url) + count = 1 + while response.status_code == 502 and count != 10: + count = count + 1 + response = utils.get_data(url) + if response.status_code != 200: + raise DataFetchError(url, response.status_code) + total = utils.get_total_results_count(response.text) return {type_of_query: total} @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 0c8955e..0bf334e 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -1,8 +1,10 @@ +import datetime import math import re import backoff import requests +from common.exceptions import WrongInput from open_access.parsers import get_golden_access_records_ids @@ -62,7 +64,15 @@ def get_total_results_count(data): @backoff.on_exception( - backoff.expo, requests.exceptions.ProxyError, max_time=120, max_tries=5 + backoff.expo, requests.exceptions.ProxyError, max_time=1000, max_tries=10 ) def get_data(url): return requests.get(url) + + +def check_year(year): + current_year = datetime.date.today().year + if type(year) == int: + if int(year) >= 2004 and int(year) <= current_year: + return year + raise WrongInput(year, current_year)