From 9a29b9e474193effad41e1aa9470635e63ef4a14 Mon Sep 17 00:00:00 2001 From: extreme4all <> Date: Fri, 10 Nov 2023 01:44:55 +0100 Subject: [PATCH] kafka setup --- docker-compose-env.yml | 127 ++++++++++++++++---------------- kafka_setup/.dockerignore | 1 + kafka_setup/Dockerfile | 13 ++++ kafka_setup/kafka_data/.gitkeep | 0 kafka_setup/requirements.txt | 1 + kafka_setup/setup_kafka.py | 94 +++++++++++++++++++++++ notes.md | 5 +- src/jobs/kafka/players/main.py | 4 +- 8 files changed, 179 insertions(+), 66 deletions(-) create mode 100644 kafka_setup/.dockerignore create mode 100644 kafka_setup/Dockerfile create mode 100644 kafka_setup/kafka_data/.gitkeep create mode 100644 kafka_setup/requirements.txt create mode 100644 kafka_setup/setup_kafka.py diff --git a/docker-compose-env.yml b/docker-compose-env.yml index fbe9f08..782e66d 100644 --- a/docker-compose-env.yml +++ b/docker-compose-env.yml @@ -1,68 +1,69 @@ version: '3' services: - automation: - build: - context: . - dockerfile: Dockerfile - container_name: bd-automation - env_file: - - .env - networks: - - botdetector-network - depends_on: - - kafka - - api - - mysql - mysql: - build: - context: ../bot-detector-mysql - dockerfile: Dockerfile - image: bot-detector/bd-mysql:latest - environment: - - MYSQL_ROOT_PASSWORD=root_bot_buster - - MYSQL_USER=botssuck - - MYSQL_PASSWORD=botdetector - volumes: - - ../bot-detector-mysql/mount:/var/lib/mysql - - '../bot-detector-mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d' - ports: - - 3306:3306 - networks: - - botdetector-network - api: - build: - context: ../Bot-Detector-Core-Files/ - dockerfile: Dockerfile - target: base - args: - root_path: / - api_port: 5000 - command: uvicorn src.core.server:app --host 0.0.0.0 --reload --reload-include src/* - container_name: bd-dev-api - environment: - - sql_uri=mysql+asyncmy://root:root_bot_buster@mysql:3306/playerdata - - discord_sql_uri=mysql+asyncmy://root:root_bot_buster@mysql:3306/discord - - token=verify_ban - volumes: - - ../Bot-Detector-Core-Files/src:/project/src:rw - ports: - - 5000:5000 - networks: - - botdetector-network - depends_on: - - mysql - kafka: - image: bitnami/kafka:latest - environment: - - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true - ports: - - 9094:9094 - networks: - - botdetector-network + kafka: + container_name: kafka + image: bitnami/kafka:3.5.1-debian-11-r3 + environment: + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + # volumes: + # - ./kafka:/bitnami/kafka:rw + ports: + - 9094:9094 + - 9092:9092 + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] + interval: 30s + timeout: 10s + retries: 5 + networks: + - botdetector-network + + kafdrop: + container_name: kafdrop + image: obsidiandynamics/kafdrop:latest + environment: + - KAFKA_BROKERCONNECT=kafka:9092 + - JVM_OPTS=-Xms32M -Xmx64M + - SERVER_SERVLET_CONTEXTPATH=/ + ports: + - 9000:9000 + restart: on-failure + networks: + - botdetector-network + depends_on: + kafka: + condition: service_healthy + + kafka_setup: + image: bot-detector/kafka_setup + container_name: kafka_setup + build: + context: ./kafka_setup + environment: + - KAFKA_BROKER=kafka:9092 + networks: + - botdetector-network + depends_on: + kafka: + condition: service_healthy + + automation: + image: bot-detector/automation + build: + context: . + dockerfile: Dockerfile + container_name: bd-automation + env_file: + - .env + networks: + - botdetector-network + depends_on: + kafka: + condition: service_healthy networks: botdetector-network: diff --git a/kafka_setup/.dockerignore b/kafka_setup/.dockerignore new file mode 100644 index 0000000..121473d --- /dev/null +++ b/kafka_setup/.dockerignore @@ -0,0 +1 @@ +kafka_data/*.json \ No newline at end of file diff --git a/kafka_setup/Dockerfile b/kafka_setup/Dockerfile new file mode 100644 index 0000000..9fec847 --- /dev/null +++ b/kafka_setup/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Set the KAFKA_BROKER environment variable during container runtime +ENV KAFKA_BROKER=localhost:9094 + +CMD ["python", "setup_kafka.py"] \ No newline at end of file diff --git a/kafka_setup/kafka_data/.gitkeep b/kafka_setup/kafka_data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/kafka_setup/requirements.txt b/kafka_setup/requirements.txt new file mode 100644 index 0000000..34aa892 --- /dev/null +++ b/kafka_setup/requirements.txt @@ -0,0 +1 @@ +kafka-python \ No newline at end of file diff --git a/kafka_setup/setup_kafka.py b/kafka_setup/setup_kafka.py new file mode 100644 index 0000000..2c68575 --- /dev/null +++ b/kafka_setup/setup_kafka.py @@ -0,0 +1,94 @@ +# setup_kafka.py +import json +import os +import time +import zipfile + +from kafka import KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic + + +def create_topics(): + # Get the Kafka broker address from the environment variable + kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094") + + # Create Kafka topics + admin_client = KafkaAdminClient(bootstrap_servers=kafka_broker) + + topics = admin_client.list_topics() + print("existing topics", topics) + + if not topics == []: + admin_client.delete_topics(topics) + + res = admin_client.create_topics( + [ + NewTopic( + name="player", + num_partitions=3, + replication_factor=1, + ), + NewTopic( + name="scraper", + num_partitions=4, + replication_factor=1, + ), + NewTopic( + name="reports", + num_partitions=4, + replication_factor=1, + ), + ] + ) + + print("created_topic", res) + + topics = admin_client.list_topics() + print("all topics", topics) + return + + +def send_json_to_kafka(file_path, producer, topic): + with open(file_path) as file: + data = json.load(file) + + for record in data: + # record = json.dumps(record).encode("utf-8") + producer.send(topic, value=record) + return + + +def insert_data(): + # Get the Kafka broker address from the environment variable + kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094") + + zip_file_path = "kafka_data/kafka_data.zip" + extracted_folder = "kafka_data" + + print("Extracting data from the zip archive...") + # Extract the files from the zip archive + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(extracted_folder) + + # Create the Kafka producer + producer = KafkaProducer( + bootstrap_servers=kafka_broker, + value_serializer=lambda x: json.dumps(x).encode(), + ) + + for file_name in os.listdir(extracted_folder): + if file_name.endswith(".json"): + file_path = os.path.join(extracted_folder, file_name) + print(f"Processing file: {file_path}") + send_json_to_kafka(file_path, producer, "player") + + print("Data insertion completed.") + + +def setup_kafka(): + create_topics() + insert_data() + + +if __name__ == "__main__": + setup_kafka() diff --git a/notes.md b/notes.md index 7d8ee5d..100fe64 100644 --- a/notes.md +++ b/notes.md @@ -31,4 +31,7 @@ powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '==', call pip install -r requirements.txt --upgrade call pip freeze > requirements.txt powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '>=', '==' } | Set-Content requirements.txt" -``` \ No newline at end of file +``` + +kubectl port-forward -n kafka svc/bd-prd-kafka-service 9094:9094 +kubectl port-forward -n database svc/mysql 3306:3306 \ No newline at end of file diff --git a/src/jobs/kafka/players/main.py b/src/jobs/kafka/players/main.py index 2c6260b..7ba2462 100644 --- a/src/jobs/kafka/players/main.py +++ b/src/jobs/kafka/players/main.py @@ -71,7 +71,7 @@ async def parse_data(players: list[dict]): async def get_request( url: str, params: dict, headers: dict = {} -) -> tuple(list[dict], Any): +) -> tuple[list[dict], Any]: data = None error = None async with aiohttp.ClientSession() as session: @@ -111,7 +111,7 @@ async def get_data(receive_queue: Queue): continue players = await parse_data(players=players) - + logger.info(f"Received: {len(players)=}") await asyncio.gather(*[receive_queue.put(item=p) for p in players]) if len(players) < APPCONFIG.BATCH_SIZE: