Skip to content

Commit

Permalink
user greater_than cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Sep 13, 2023
1 parent bbfba82 commit 0b28f25
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions src/jobs/kafka/players/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, message_queue: Queue):
async def handle_aiohttp_error(self, response: ClientResponse):
if response.status != 200:
logger.error(
f"response status {response.status}"
f"response status {response.status} "
f"response body: {await response.text()}"
)
raise Exception("error fetching players")
Expand Down Expand Up @@ -102,6 +102,7 @@ async def get_data(self):
page = 1
unique_ids = deque(maxlen=1_000_000)
last_day = datetime.now().date()
max_id = 1

while True:
if self.message_queue.qsize() > int(self.message_queue.maxsize / 2):
Expand All @@ -113,17 +114,22 @@ async def get_data(self):

url = f"{APPCONFIG.ENDPOINT}/v2/players/"

params = {"page": page, "page_size": APPCONFIG.BATCH_SIZE}
params = {
"page_size": APPCONFIG.BATCH_SIZE,
"greater_than": max_id
}

headers = {"token": APPCONFIG.API_TOKEN}

logger.info(f"fetching players to scrape {page=}")
logger.info(f"fetching players to scrape {page=}, {max_id=}")

today = datetime.now().date()

if today != last_day:
last_day = datetime.now().date()
unique_ids.clear()
page = 1
max_id = 0

try:
async with self.session.get(
Expand All @@ -144,6 +150,9 @@ async def get_data(self):
continue

asyncio.ensure_future(self.add_data_to_queue(players, unique_ids))

players_max = max([p.get('id') for p in players])
max_id = players_max if players_max > max_id else max_id

page += 1

Expand Down

0 comments on commit 0b28f25

Please sign in to comment.