Skip to content

Commit

Permalink
kafka setup
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Nov 10, 2023
1 parent 6210dcd commit 9a29b9e
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 66 deletions.
127 changes: 64 additions & 63 deletions docker-compose-env.yml
Original file line number Diff line number Diff line change
@@ -1,68 +1,69 @@
version: '3'
services:
automation:
build:
context: .
dockerfile: Dockerfile
container_name: bd-automation
env_file:
- .env
networks:
- botdetector-network
depends_on:
- kafka
- api
- mysql
mysql:
build:
context: ../bot-detector-mysql
dockerfile: Dockerfile
image: bot-detector/bd-mysql:latest
environment:
- MYSQL_ROOT_PASSWORD=root_bot_buster
- MYSQL_USER=botssuck
- MYSQL_PASSWORD=botdetector
volumes:
- ../bot-detector-mysql/mount:/var/lib/mysql
- '../bot-detector-mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d'
ports:
- 3306:3306
networks:
- botdetector-network
api:
build:
context: ../Bot-Detector-Core-Files/
dockerfile: Dockerfile
target: base
args:
root_path: /
api_port: 5000
command: uvicorn src.core.server:app --host 0.0.0.0 --reload --reload-include src/*
container_name: bd-dev-api
environment:
- sql_uri=mysql+asyncmy://root:root_bot_buster@mysql:3306/playerdata
- discord_sql_uri=mysql+asyncmy://root:root_bot_buster@mysql:3306/discord
- token=verify_ban
volumes:
- ../Bot-Detector-Core-Files/src:/project/src:rw
ports:
- 5000:5000
networks:
- botdetector-network
depends_on:
- mysql
kafka:
image: bitnami/kafka:latest
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
ports:
- 9094:9094
networks:
- botdetector-network
kafka:
container_name: kafka
image: bitnami/kafka:3.5.1-debian-11-r3
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# volumes:
# - ./kafka:/bitnami/kafka:rw
ports:
- 9094:9094
- 9092:9092
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"]
interval: 30s
timeout: 10s
retries: 5
networks:
- botdetector-network

kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop:latest
environment:
- KAFKA_BROKERCONNECT=kafka:9092
- JVM_OPTS=-Xms32M -Xmx64M
- SERVER_SERVLET_CONTEXTPATH=/
ports:
- 9000:9000
restart: on-failure
networks:
- botdetector-network
depends_on:
kafka:
condition: service_healthy

kafka_setup:
image: bot-detector/kafka_setup
container_name: kafka_setup
build:
context: ./kafka_setup
environment:
- KAFKA_BROKER=kafka:9092
networks:
- botdetector-network
depends_on:
kafka:
condition: service_healthy

automation:
image: bot-detector/automation
build:
context: .
dockerfile: Dockerfile
container_name: bd-automation
env_file:
- .env
networks:
- botdetector-network
depends_on:
kafka:
condition: service_healthy

networks:
botdetector-network:
1 change: 1 addition & 0 deletions kafka_setup/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kafka_data/*.json
13 changes: 13 additions & 0 deletions kafka_setup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Set the KAFKA_BROKER environment variable during container runtime
ENV KAFKA_BROKER=localhost:9094

CMD ["python", "setup_kafka.py"]
Empty file added kafka_setup/kafka_data/.gitkeep
Empty file.
1 change: 1 addition & 0 deletions kafka_setup/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kafka-python
94 changes: 94 additions & 0 deletions kafka_setup/setup_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# setup_kafka.py
import json
import os
import time
import zipfile

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic


def create_topics():
# Get the Kafka broker address from the environment variable
kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094")

# Create Kafka topics
admin_client = KafkaAdminClient(bootstrap_servers=kafka_broker)

topics = admin_client.list_topics()
print("existing topics", topics)

if not topics == []:
admin_client.delete_topics(topics)

res = admin_client.create_topics(
[
NewTopic(
name="player",
num_partitions=3,
replication_factor=1,
),
NewTopic(
name="scraper",
num_partitions=4,
replication_factor=1,
),
NewTopic(
name="reports",
num_partitions=4,
replication_factor=1,
),
]
)

print("created_topic", res)

topics = admin_client.list_topics()
print("all topics", topics)
return


def send_json_to_kafka(file_path, producer, topic):
with open(file_path) as file:
data = json.load(file)

for record in data:
# record = json.dumps(record).encode("utf-8")
producer.send(topic, value=record)
return


def insert_data():
# Get the Kafka broker address from the environment variable
kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094")

zip_file_path = "kafka_data/kafka_data.zip"
extracted_folder = "kafka_data"

print("Extracting data from the zip archive...")
# Extract the files from the zip archive
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extracted_folder)

# Create the Kafka producer
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda x: json.dumps(x).encode(),
)

for file_name in os.listdir(extracted_folder):
if file_name.endswith(".json"):
file_path = os.path.join(extracted_folder, file_name)
print(f"Processing file: {file_path}")
send_json_to_kafka(file_path, producer, "player")

print("Data insertion completed.")


def setup_kafka():
create_topics()
insert_data()


if __name__ == "__main__":
setup_kafka()
5 changes: 4 additions & 1 deletion notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '==',
call pip install -r requirements.txt --upgrade
call pip freeze > requirements.txt
powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '>=', '==' } | Set-Content requirements.txt"
```
```

kubectl port-forward -n kafka svc/bd-prd-kafka-service 9094:9094
kubectl port-forward -n database svc/mysql 3306:3306
4 changes: 2 additions & 2 deletions src/jobs/kafka/players/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def parse_data(players: list[dict]):

async def get_request(
url: str, params: dict, headers: dict = {}
) -> tuple(list[dict], Any):
) -> tuple[list[dict], Any]:
data = None
error = None
async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -111,7 +111,7 @@ async def get_data(receive_queue: Queue):
continue

players = await parse_data(players=players)

logger.info(f"Received: {len(players)=}")
await asyncio.gather(*[receive_queue.put(item=p) for p in players])

if len(players) < APPCONFIG.BATCH_SIZE:
Expand Down

0 comments on commit 9a29b9e

Please sign in to comment.