Skip to content

Commit

Permalink
kafka_consumer & producer module
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Aug 6, 2023
1 parent 16ee323 commit 2f4152f
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 143 deletions.
18 changes: 3 additions & 15 deletions src/core/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

from src import api
from src.core import config
from src.kafka.highscore import HiscoreConsumer, MessageProcessor
from src.kafka.kafka import Kafka
from src.kafka.highscore import HighscoreProcessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,16 +101,5 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
@app.on_event("startup")
async def startup_event():
logger.info("startup initiated")
# Call the Kafka consumer function
kafka_scraper = Kafka(
name="kafka_scraper",
message_processor=MessageProcessor(),
message_consumer=HiscoreConsumer(),
group_id="highscore-api",
topics=["scraper"],
batch_size=100,
)
logger.info("kafka_scraper created")
await kafka_scraper.initialize()
asyncio.ensure_future(kafka_scraper.run())
logger.info("startup done")
highscore_processor = HighscoreProcessor(batch_size=100)
asyncio.ensure_future(highscore_processor.start())
73 changes: 62 additions & 11 deletions src/kafka/highscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,81 @@
from src.app.repositories.player import Player as RepositoryPlayer
from src.app.schemas.highscore import PlayerHiscoreData as SchemaHiscore
from src.app.schemas.player import Player as SchemaPlayer
from src.kafka.abc import AbstractConsumer, AbstractMP
from src.database.models import playerHiscoreData as dbHiscore
from src.kafka.modules.kafka_consumer import KafkaMessageConsumer
from src.kafka.modules.kafka_producer import KafkaMessageProducer
from asyncio import Queue
import asyncio
import time
from src.core import config

logger = logging.getLogger(__name__)


class MessageProcessor(AbstractMP):
async def process_message(self, message: ConsumerRecord) -> dict:
message = json.loads(message)
return message
class HighscoreProcessor:
def __init__(self, batch_size: int = 100) -> None:
self.repo_highscore = RepoHiscore()
self.repo_player = RepositoryPlayer()
self.message_queue = Queue(maxsize=batch_size * 2)
self.batch = []

self.batch_size = batch_size

class HiscoreConsumer(AbstractConsumer):
# Initialize the repositories for highscores and players
repo_highscore = RepoHiscore()
repo_player = RepositoryPlayer()
async def initialize(self):
self.message_consumer = KafkaMessageConsumer(
bootstrap_servers=config.kafka_url,
consumer_topic="scraper",
group_id="highscore-api",
message_queue=self.message_queue,
)

async def process_batch(self, batch: list[dict]):
async def start(self):
logger.info("starting")
while True:
await self.initialize()
try:
await asyncio.gather(
self.message_consumer.consume_messages_continuously(),
self.process_messages(),
)
except Exception as error:
logger.error(f"Error occured: {str(error)}")
finally:
await self.message_consumer.stop()
await asyncio.sleep(5)

async def process_messages(self):
logger.info("start processing messages")
last_send = int(time.time())
while True:
message: dict = await self.message_queue.get()

self.batch.append(message)

delta_send_time = int(time.time()) - last_send
delta_send_time = delta_send_time if delta_send_time != 0 else 1

qsize = self.message_queue.qsize()

if len(self.batch) > self.batch_size or (
delta_send_time > 60 and self.batch and qsize == 0
):
last_send = int(time.time())
await self.process_batch()

logger.info(f"{qsize=} - {len(self.batch)/delta_send_time:.2f} it/s")
self.batch = []

self.message_queue.task_done()

async def process_batch(self):
# Lists to store processed highscores and players
highscores: list[SchemaHiscore] = []
players: list[SchemaPlayer] = []

# Process each row of data containing 'hiscores' and 'player' information
for row in batch:
for row in self.batch:
row: dict
# Extract 'hiscores' and 'player' dictionaries from the row
highscore = row.get("hiscores")
player = row.get("player")
Expand Down
115 changes: 0 additions & 115 deletions src/kafka/kafka.py

This file was deleted.

11 changes: 9 additions & 2 deletions src/kafka/abc.py → src/kafka/modules/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ async def process_batch(self, batch: list[dict]):
pass


class AbstractMP(ABC):
class AbstractProcessor(ABC):
def __init__(self) -> None:
super().__init__()

@abstractmethod
async def process_batch(self) -> None:
pass

@abstractmethod
async def process_message(self, message: ConsumerRecord) -> dict:
async def process_message(self):
pass
54 changes: 54 additions & 0 deletions src/kafka/modules/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import json
from asyncio import Queue

from aiokafka import AIOKafkaConsumer
import logging

logger = logging.getLogger(__name__)


class KafkaMessageConsumer:
def __init__(
self, bootstrap_servers, consumer_topic, group_id, message_queue: Queue
):
self.bootstrap_servers = bootstrap_servers
self.consumer_topic = consumer_topic
self.group_id = group_id
self.message_queue = message_queue
self.consumer = None

async def start(self):
logger.info("starting")
self.consumer = AIOKafkaConsumer(
self.consumer_topic,
group_id=self.group_id,
loop=asyncio.get_event_loop(),
bootstrap_servers=self.bootstrap_servers,
)
await self.consumer.start()

async def stop(self):
logger.info("stopping")
if self.consumer:
await self.consumer.stop()

async def consume_messages(self):
logger.info("start consuming messages")
try:
async for message in self.consumer:
message = message.value.decode("utf-8")
message = json.loads(message)
await self.message_queue.put(message)
except Exception as e:
print(f"Error consuming message: {e}")

async def consume_messages_continuously(self):
while True:
await self.start()
try:
await self.consume_messages()
finally:
await self.stop()

await asyncio.sleep(5)
39 changes: 39 additions & 0 deletions src/kafka/modules/kafka_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
from asyncio import Queue
from aiokafka import AIOKafkaProducer


class KafkaMessageProducer:
def __init__(self, bootstrap_servers, producer_topic):
self.bootstrap_servers = bootstrap_servers
self.producer_topic = producer_topic
self.producer = None

async def start(self):
self.producer = AIOKafkaProducer(
loop=asyncio.get_event_loop(),
bootstrap_servers=self.bootstrap_servers,
)
await self.producer.start()

async def stop(self):
if self.producer:
await self.producer.stop()

async def produce_message(self, message):
try:
await self.producer.send(self.producer_topic, message.encode("utf-8"))
except Exception as e:
print(f"Error producing message: {e}")

async def produce_messages_from_queue(self, producer_queue: Queue):
while True:
await self.start()
try:
async for message in producer_queue:
await self.produce_message(message)
producer_queue.task_done()
finally:
await self.stop()

await asyncio.sleep(5)

0 comments on commit 2f4152f

Please sign in to comment.