From b5e96d043f778fe760ce4fa81b4a6dc6017a1bdd Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Tue, 19 Sep 2023 22:26:31 +0200 Subject: [PATCH] better error handling --- src/jobs/kafka/players/main.py | 62 +++++++++++++++------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/src/jobs/kafka/players/main.py b/src/jobs/kafka/players/main.py index 0cbba8e..bc33df2 100644 --- a/src/jobs/kafka/players/main.py +++ b/src/jobs/kafka/players/main.py @@ -63,14 +63,6 @@ def __init__(self, message_queue: Queue): self.headers = {"token": APPCONFIG.API_TOKEN} self.session = aiohttp.ClientSession(headers=self.headers) - async def handle_aiohttp_error(self, response: ClientResponse): - if response.status != 200: - logger.error( - f"response status {response.status} " - f"response body: {await response.text()}" - ) - raise Exception("error fetching players") - async def add_data_to_queue(self, players: List[dict], unique_ids: List[int]): today = datetime.now().date() added = 0 @@ -97,32 +89,31 @@ async def add_data_to_queue(self, players: List[dict], unique_ids: List[int]): logger.info(f"{qsize=}, {len(unique_ids)=}") logger.info(f"{added=}") - async def get_data(self): + async def get_data(self, url:str, params:dict) -> list[dict]: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, headers=self.headers) as resp: + if not resp.ok: + error_message = ( + f"response status {resp.status} " + f"response body: {await resp.text()}" + ) + logger.error(error_message) + raise ValueError(error_message) + data = await resp.json() + return data + + async def main(self): logger.info(f"starting: {self.__class__.__name__}") page = 1 unique_ids = deque(maxlen=1_000_000) last_day = datetime.now().date() - max_id = 1 + max_id = 0 while True: if self.message_queue.qsize() > int(self.message_queue.maxsize / 2): await asyncio.sleep(1) continue - # page = page if page <= 10 else 0 # only for scraper endpoint - # url = f"{APPCONFIG.ENDPOINT}/v1/scraper/players/{page}/{APPCONFIG.BATCH_SIZE}/{APPCONFIG.API_TOKEN}" - - url = f"{APPCONFIG.ENDPOINT}/v2/players/" - - params = { - "page_size": APPCONFIG.BATCH_SIZE, - "greater_than": max_id - } - - headers = {"token": APPCONFIG.API_TOKEN} - - logger.info(f"fetching players to scrape {page=}, {max_id=}") - today = datetime.now().date() if today != last_day: @@ -130,16 +121,19 @@ async def get_data(self): unique_ids.clear() page = 1 max_id = 0 + + logger.info(f"fetching players to scrape {page=}, {max_id=}") try: - async with self.session.get( - url, params=params, headers=headers - ) as response: - await self.handle_aiohttp_error(response) - players = await response.json() + url = f"{APPCONFIG.ENDPOINT}/v2/players/" + params = { + "page_size": APPCONFIG.BATCH_SIZE, + "greater_than": max_id + } + players = await self.get_data(url=url, params=params) except Exception as error: logger.error(f"An error occurred: {type(error)} - {str(error)}") - await asyncio.sleep(5) # Adjust the delay as needed + await asyncio.sleep(5) continue logger.info(f"fetched {len(players)} players") @@ -158,15 +152,15 @@ async def get_data(self): async def retry(f) -> None: + class_name = f.__self__.__class__.__name__ + method_name = f.__name__ while True: try: - class_name = f.__self__.__class__.__name__ - method_name = f.__name__ logger.info(f"starting: {class_name}.{method_name}") await f() except Exception as e: logger.error(f"{class_name}.{method_name} - {e}") - pass + await asyncio.sleep(15) async def async_main(): @@ -174,7 +168,7 @@ async def async_main(): data_fetcher = DataFetcher(message_queue) kafka_producer = KafkaProducer("player", message_queue) - asyncio.ensure_future(retry(data_fetcher.get_data)) + asyncio.ensure_future(retry(data_fetcher.main)) asyncio.ensure_future(retry(kafka_producer.start))