Skip to content

Commit

Permalink
better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Sep 19, 2023
1 parent 0b28f25 commit b5e96d0
Showing 1 changed file with 28 additions and 34 deletions.
62 changes: 28 additions & 34 deletions src/jobs/kafka/players/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ def __init__(self, message_queue: Queue):
self.headers = {"token": APPCONFIG.API_TOKEN}
self.session = aiohttp.ClientSession(headers=self.headers)

async def handle_aiohttp_error(self, response: ClientResponse):
if response.status != 200:
logger.error(
f"response status {response.status} "
f"response body: {await response.text()}"
)
raise Exception("error fetching players")

async def add_data_to_queue(self, players: List[dict], unique_ids: List[int]):
today = datetime.now().date()
added = 0
Expand All @@ -97,49 +89,51 @@ async def add_data_to_queue(self, players: List[dict], unique_ids: List[int]):
logger.info(f"{qsize=}, {len(unique_ids)=}")
logger.info(f"{added=}")

async def get_data(self):
async def get_data(self, url:str, params:dict) -> list[dict]:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, headers=self.headers) as resp:
if not resp.ok:
error_message = (
f"response status {resp.status} "
f"response body: {await resp.text()}"
)
logger.error(error_message)
raise ValueError(error_message)
data = await resp.json()
return data

async def main(self):
logger.info(f"starting: {self.__class__.__name__}")
page = 1
unique_ids = deque(maxlen=1_000_000)
last_day = datetime.now().date()
max_id = 1
max_id = 0

while True:
if self.message_queue.qsize() > int(self.message_queue.maxsize / 2):
await asyncio.sleep(1)
continue

# page = page if page <= 10 else 0 # only for scraper endpoint
# url = f"{APPCONFIG.ENDPOINT}/v1/scraper/players/{page}/{APPCONFIG.BATCH_SIZE}/{APPCONFIG.API_TOKEN}"

url = f"{APPCONFIG.ENDPOINT}/v2/players/"

params = {
"page_size": APPCONFIG.BATCH_SIZE,
"greater_than": max_id
}

headers = {"token": APPCONFIG.API_TOKEN}

logger.info(f"fetching players to scrape {page=}, {max_id=}")

today = datetime.now().date()

if today != last_day:
last_day = datetime.now().date()
unique_ids.clear()
page = 1
max_id = 0

logger.info(f"fetching players to scrape {page=}, {max_id=}")

try:
async with self.session.get(
url, params=params, headers=headers
) as response:
await self.handle_aiohttp_error(response)
players = await response.json()
url = f"{APPCONFIG.ENDPOINT}/v2/players/"
params = {
"page_size": APPCONFIG.BATCH_SIZE,
"greater_than": max_id
}
players = await self.get_data(url=url, params=params)
except Exception as error:
logger.error(f"An error occurred: {type(error)} - {str(error)}")
await asyncio.sleep(5) # Adjust the delay as needed
await asyncio.sleep(5)
continue

logger.info(f"fetched {len(players)} players")
Expand All @@ -158,23 +152,23 @@ async def get_data(self):


async def retry(f) -> None:
class_name = f.__self__.__class__.__name__
method_name = f.__name__
while True:
try:
class_name = f.__self__.__class__.__name__
method_name = f.__name__
logger.info(f"starting: {class_name}.{method_name}")
await f()
except Exception as e:
logger.error(f"{class_name}.{method_name} - {e}")
pass
await asyncio.sleep(15)


async def async_main():
message_queue = Queue(maxsize=10_000)
data_fetcher = DataFetcher(message_queue)
kafka_producer = KafkaProducer("player", message_queue)

asyncio.ensure_future(retry(data_fetcher.get_data))
asyncio.ensure_future(retry(data_fetcher.main))
asyncio.ensure_future(retry(kafka_producer.start))


Expand Down

0 comments on commit b5e96d0

Please sign in to comment.