diff --git a/.vscode/settings.json b/.vscode/settings.json index 30ee50f..c8d9827 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,11 +1,8 @@ { - "python.testing.pytestArgs": [ - "src" - ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter", + "editor.defaultFormatter": "charliermarsh.ruff", "editor.formatOnSave": true, "editor.codeActionsOnSave": { "source.organizeImports": "explicit" diff --git a/Dockerfile b/Dockerfile index bacbed4..cc280a3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Base stage -FROM python:3.11-slim as base +FROM python:3.11-slim AS base # Keeps Python from generating .pyc files in the container ENV PYTHONDONTWRITEBYTECODE=1 @@ -17,18 +17,16 @@ RUN pip install --no-cache-dir -r requirements.txt # copy the scripts to the folder COPY ./src /project/src -# Production stage for highscore -FROM base as production-highscore +# Non-root user setup stage +FROM base AS user-setup # Creates a non-root user with an explicit UID and adds permission to access the /project folder RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /project USER appuser +# Production stage for highscore +FROM user-setup AS production-highscore CMD ["python", "src/main_highscore.py"] # Production stage for runemetrics -FROM base as production-runemetrics -# Creates a non-root user with an explicit UID and adds permission to access the /project folder -RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /project -USER appuser - +FROM user-setup AS production-runemetrics CMD ["python", "src/main_runemetrics.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..71102d7 --- /dev/null +++ b/Makefile @@ -0,0 +1,33 @@ +clean-pyc: ## Clean Python cache files + find . \( -name '*.pyc' -o -name '*.pyo' -o -name '*~' -o -name '__pycache__' -o -name '.pytest_cache' \) -exec rm -rf {} + + +clean-test: ## Cleanup pytest leftovers + rm -f .coverage + rm -fr htmlcov/ + rm -fr test_results/ + rm -f *report.html log.html test-results.html output.xml + +docker-restart: ## Restart containers + docker compose down + docker compose up --build -d + +docker-test: docker-restart ## Restart containers & run tests + pytest + +docker-test-verbose: docker-restart ## Restart containers & run tests with verbosity + pytest -s + +pre-commit-setup: ## Install pre-commit + python3 -m pip install pre-commit + pre-commit --version + +requirements: ## Install all requirements + python3 -m pip install -r requirements.txt + python3 -m pip install -r requirements-test.txt + python3 -m pip install ruff + +setup: requirements pre-commit-setup## Set up all requirements + +docs: ## Open your browser to the web apps testing docs + @echo "Opening documentation..." + xdg-open http://localhost:5000/docs || open http://localhost:5000/docs diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 29fe348..095720d 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -1,4 +1,3 @@ -version: '3' services: kafka: container_name: kafka diff --git a/docker-compose.yml b/docker-compose.yml index 4ff0695..eb1bd6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: kafka: container_name: kafka @@ -57,16 +56,20 @@ services: depends_on: kafka: condition: service_healthy - + scraper_highscore: - image: bot_detector/scraper_highscore - container_name: scraper_highscore + image: bd/scraper_hiscore + container_name: scraper_hiscore build: context: . dockerfile: Dockerfile target: base - command: python src/main_highscore.py + command: > + sh -c "pip install watchdog && watchmedo auto-restart --patterns='*.py' --recursive -- python src/main.py --run-hs-worker" restart: on-failure + ports: + # host:container + - 8000:8000 networks: - botdetector-network env_file: @@ -76,16 +79,20 @@ services: depends_on: kafka: condition: service_healthy - + scraper_runemetrics: - image: bot_detector/scraper_runemetrics + image: bd/scraper_runemetrics container_name: scraper_runemetrics build: context: . dockerfile: Dockerfile target: base - command: python src/main_runemetrics.py + command: > + sh -c "pip install watchdog && watchmedo auto-restart --patterns='*.py' --recursive -- python src/main.py --run-rm-worker" restart: on-failure + ports: + # host:container + - 8001:8000 networks: - botdetector-network env_file: @@ -95,5 +102,6 @@ services: depends_on: kafka: condition: service_healthy + networks: botdetector-network: diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..6e42495 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,2 @@ +pytest==8.3.3 +pytest-asyncio==0.24.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ec01f28..28974ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,30 +1,45 @@ -aiohttp==3.9.0 -aiokafka==0.8.1 -aiosignal==1.3.1 -async-timeout==4.0.2 -attrs==22.1.0 -black==22.12.0 -certifi==2023.7.22 -cffi==1.15.1 -chardet==5.1.0 -charset-normalizer==2.1.1 -click==8.1.3 -colorama==0.4.6 -frozenlist==1.3.3 -idna==3.4 -kafka-python==2.0.2 -multidict==6.0.3 -mypy-extensions==0.4.3 -packaging==23.1 -pathspec==0.10.3 -platformdirs==2.6.0 -pycares==4.3.0 -pycparser==2.21 -pydantic==1.10.8 -python-dotenv==0.21.0 -requests==2.31.0 -rfc3339==6.2 -tomli==2.0.1 -typing_extensions==4.4.0 -urllib3==1.26.13 -yarl==1.8.2 +aiohappyeyeballs==2.4.3 +aiohttp==3.10.10 +aiokafka==0.12.0 +aiosignal==1.3.1 +annotated-types==0.7.0 +async-timeout==4.0.3 +attrs==24.2.0 +black==24.10.0 +certifi==2024.8.30 +cffi==1.17.1 +cfgv==3.4.0 +chardet==5.2.0 +charset-normalizer==3.4.0 +click==8.1.7 +colorama==0.4.6 +distlib==0.3.9 +filelock==3.16.1 +frozenlist==1.5.0 +identify==2.6.1 +idna==3.10 +kafka-python==2.0.2 +multidict==6.1.0 +mypy-extensions==1.0.0 +nodeenv==1.9.1 +osrs==0.0.13 +packaging==24.1 +pathspec==0.12.1 +platformdirs==4.3.6 +pre_commit==4.0.1 +prometheus_client==0.21.0 +propcache==0.2.0 +pycares==4.4.0 +pycparser==2.22 +pydantic==2.9.2 +pydantic-settings==2.6.0 +pydantic_core==2.23.4 +python-dotenv==1.0.1 +PyYAML==6.0.2 +requests==2.32.3 +rfc3339==6.2 +tomli==2.0.2 +typing_extensions==4.12.2 +urllib3==2.2.3 +virtualenv==20.27.1 +yarl==1.17.0 diff --git a/src/modules/__init__.py b/src/__init__.py similarity index 100% rename from src/modules/__init__.py rename to src/__init__.py diff --git a/src/config/__init__.py b/src/config/__init__.py new file mode 100644 index 0000000..fc925f6 --- /dev/null +++ b/src/config/__init__.py @@ -0,0 +1,3 @@ +from config import logging + +__all__ = ["logging"] diff --git a/src/config/config.py b/src/config/config.py index 90e19bf..df40036 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -1,8 +1,6 @@ import os -from pydantic import BaseSettings - -from config import logging +from pydantic_settings import BaseSettings class AppConfig(BaseSettings): diff --git a/src/config/logging.py b/src/config/logging.py index 1bec6f3..cdb60ee 100644 --- a/src/config/logging.py +++ b/src/config/logging.py @@ -18,9 +18,21 @@ # file_handler # this is good for debugging ] + +# | Level | Numeric Value | What it Means / When to Use It | +# |----------------------|---------------|--------------------------------------------------------------------------------------------------| +# | `logging.NOTSET` | 0 | When set on a logger, indicates that ancestor loggers are to be consulted to determine the effective level. If that still resolves to NOTSET, then all events are logged. When set on a handler, all events are handled. | +# | `logging.DEBUG` | 10 | Detailed information, typically only of interest to a developer trying to diagnose a problem. | +# | `logging.INFO` | 20 | Confirmation that things are working as expected. | +# | `logging.WARNING` | 30 | An indication that something unexpected happened, or that a problem might occur in the near future (e.g., ‘disk space low’). The software is still working as expected. | +# | `logging.ERROR` | 40 | Due to a more serious problem, the software has not been able to perform some function. | +# | `logging.CRITICAL` | 50 | A serious error, indicating that the program itself may be unable to continue running. | + logging.basicConfig(level=logging.DEBUG, handlers=handlers) logging.getLogger("urllib3").setLevel(logging.INFO) # logging.getLogger("modules.scraper").setLevel(logging.WARNING) logging.getLogger("modules.api").setLevel(logging.WARNING) logging.getLogger("aiokafka").setLevel(logging.INFO) +logging.getLogger("aiokafka").setLevel(logging.INFO) +logging.getLogger("osrs.async_api.osrs.hiscores").setLevel(logging.INFO) diff --git a/src/hiscore/__init__.py b/src/hiscore/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hiscore/main.py b/src/hiscore/main.py new file mode 100644 index 0000000..34c7465 --- /dev/null +++ b/src/hiscore/main.py @@ -0,0 +1,307 @@ +import asyncio +import json +import logging +import time + +from aiohttp import ClientResponseError, ClientSession +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from osrs.async_api.osrs.hiscores import Hiscore, Mode, RateLimiter +from osrs.exceptions import PlayerDoesNotExist, Undefined, UnexpectedRedirection +from prometheus_client import Counter, Histogram, start_http_server +from pydantic import BaseModel + +from config.config import app_config +from modules.api.webshare_api import Webshare + +logger = logging.getLogger(__name__) + +KAFKA_TOPIC_HIGHSCORE = "player" + +# Define Prometheus metrics +total_counter = Counter( + name="highscore_request_count", + documentation="Count of request player stats fetches", +) +success_counter = Counter( + name="highscore_success_count", + documentation="Count of successful player stats fetches", + labelnames=["proxy"], +) +error_counter = Counter( + name="highscore_error_count", + documentation="Count of failed player stats fetches", + labelnames=["proxy"], +) +not_found_counter = Counter( + name="highscore_not_found_count", + documentation="Count of players not found", + labelnames=["proxy"], +) +latency_histogram = Histogram( + name="highscore_fetch_latency_seconds", + documentation="Latency of player stats fetches", + labelnames=["proxy"], +) + + +class Player(BaseModel): + id: int + name: str + created_at: str | None + updated_at: str | None + possible_ban: int + confirmed_ban: int + confirmed_player: int + label_id: int + label_jagex: int + + +hiscore_mapper = { + "league_points": "league", + "clue_scrolls_all": "cs_all", + "clue_scrolls_beginner": "cs_beginner", + "clue_scrolls_easy": "cs_easy", + "clue_scrolls_medium": "cs_medium", + "clue_scrolls_hard": "cs_hard", + "clue_scrolls_elite": "cs_elite", + "clue_scrolls_master": "cs_master", + "theatre_of_blood_hard_mode": "theatre_of_blood_hard", + "tombs_of_amascut_expert_mode": "tombs_of_amascut_expert", +} + + +class HighscoreWorkerManager: + def __init__(self, proxies, kafka_servers, kafka_topic, kafka_group): + self.proxies = proxies + self.kafka_servers = kafka_servers + self.kafka_topic = kafka_topic + self.kafka_group = kafka_group + self.player_queue = asyncio.Queue(maxsize=100) + self.player_not_found_queue = asyncio.Queue() + self.semaphore = asyncio.Semaphore(value=25) + self.kafka_producer: AIOKafkaProducer = None + self.kafka_consumer: AIOKafkaConsumer = None + + async def setup_kafka_consumer(self): + if self.kafka_consumer is not None: + logger.warning("Consumer already started") + return + + self.kafka_consumer = AIOKafkaConsumer( + self.kafka_topic, + bootstrap_servers=self.kafka_servers, + group_id=self.kafka_group, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + auto_offset_reset="earliest", + ) + await self.kafka_consumer.start() + + async def setup_kafka_producer(self): + if self.kafka_producer is not None: + logger.warning("Producer already started") + return + + self.kafka_producer = AIOKafkaProducer( + bootstrap_servers=self.kafka_servers, + value_serializer=lambda v: json.dumps(v).encode(), + acks="all", + ) + await self.kafka_producer.start() + + async def produce_message(self, topic: str, msg: dict): + """Produce player data to Kafka.""" + if not self.kafka_producer: + logger.error("Kafka Producer is None") + raise Exception("Kafka Producer is None") + + if not isinstance(msg, dict): + logger.error(f"Expected type dict, got {type(msg)}") + raise Exception(f"Expected type dict, got {type(msg)}") + + await self.kafka_producer.send( + topic=topic, + value=msg, + ) + + async def produce_player(self, topic: str, player: Player): + """Produce player back to Kafka topic.""" + + topics = ["player", "scraper-runemetrics"] + if topic not in topics: + err = f"Unsupported Topic, received: {topic} expected value in {topics=}" + logger.error(err) + raise Exception(err) + + if not isinstance(player, Player): + err = f"Expected type Player, Received: {type(player)}, {player=}" + logger.error(err) + raise Exception(err) + + await self.produce_message( + topic=topic, + msg=player.model_dump(), + ) + + async def consume_player(self): + if self.kafka_consumer is None: + raise Exception("Kafka Consumer is None") + + async for msg in self.kafka_consumer: + player = Player(**msg.value) + await self.player_queue.put(player) + + def _parse_hiscore_name(self, name: str) -> str: + name = name.lower() + name = name.replace("'", "") + name = name.replace(" - ", " ") + name = name.replace("-", "_") + name = name.replace(":", "") + name = name.replace("(", "").replace(")", "") + name = name.replace(" ", "_") + # replaces "name" with its corresponding abbreviation from "hiscore_mapper" dictionary, + # if one exists, or keeps the original name if it does not + name = hiscore_mapper.get(name, name) + return name + + async def fetch_player_stats( + self, session: ClientSession, player: Player, hiscore_instance: Hiscore + ): + proxy = session._proxy + try: + total_counter.inc() + start_time = time.time() + pstats = await hiscore_instance.get( + mode=Mode.OLDSCHOOL, + player=player.name, + session=session, + ) + latency = time.time() - start_time + + # we know the player is not banned if he is on the highscores + player.possible_ban = 0 + player.confirmed_ban = 0 + player.label_jagex = 0 + + # parse data into dict + activities = { + self._parse_hiscore_name(a.name): a.score + for a in pstats.activities + if a.score >= 0 + } + + skills = { + self._parse_hiscore_name(s.name): s.xp + for s in pstats.skills + if s.xp >= 0 and s.name != "Overall" + } + + ## merge into one dict + hiscore = activities | skills | {"total": sum(skills.values())} + + # send data to the scraper + msg = {"player": player.model_dump(), "hiscore": hiscore} + await self.produce_message(topic="scraper", msg=msg) + + # Increment success counter + success_counter.labels(proxy=proxy).inc() + + # Record latency, labeled by proxy + latency_histogram.labels(proxy=proxy).observe(latency) + except PlayerDoesNotExist: + player.possible_ban = 1 + await self.produce_player(topic="scraper-runemetrics", player=player) + not_found_counter.labels(proxy=proxy).inc() + return + + except UnexpectedRedirection as e: + error_counter.labels(proxy=proxy).inc() + logger.error(f"Highscore page down for {player.name}: {e}") + await self.produce_player(topic=KAFKA_TOPIC_HIGHSCORE, player=player) + await asyncio.sleep(120) + return + + except Undefined as e: + error_counter.labels(proxy=proxy).inc() + await self.produce_player(topic=KAFKA_TOPIC_HIGHSCORE, player=player) + logger.error(e) + raise Exception(e) # Fail loud + + except ClientResponseError as e: + error_counter.labels(proxy=proxy).inc() + await self.produce_player(topic=KAFKA_TOPIC_HIGHSCORE, player=player) + logger.error(e) + raise Exception(e) # Fail loud + # somehow i need this + finally: + return + + async def worker(self, proxy): + limiter = RateLimiter(calls_per_interval=60, interval=60) + hiscore_instance = Hiscore(proxy=proxy, rate_limiter=limiter) + + async with ClientSession() as session: + # Store proxy for latency tracking + session._proxy = proxy.split("@")[1] + + while True: + player = await self.player_queue.get() + self.player_queue.task_done() + + # TODO: Stop signal + if player is None: + logger.info("break") + break + + async with self.semaphore: + asyncio.create_task( + self.fetch_player_stats(session, player, hiscore_instance) + ) + + async def start_workers(self): + self.tasks = [asyncio.create_task(self.worker(p)) for p in self.proxies] + + async def stop_workers(self): + logger.info("Stopping Workers") + await self.kafka_consumer.stop() + self.kafka_consumer = None + + while any([t for t in self.tasks if t.done()]): + await self.player_queue.put(None) + l_tasks = len([t for t in self.tasks if t.done()]) + logger.info(f"Shutdown: running tasks: {l_tasks}") + + async def run(self): + await self.setup_kafka_consumer() + await self.setup_kafka_producer() + + try: + await self.start_workers() + await self.consume_player() + await self.stop_workers() + finally: + if self.kafka_consumer: + await self.kafka_consumer.stop() + if self.kafka_producer: + await self.kafka_producer.stop() + + +async def main(): + proxy_list = await Webshare(api_key=app_config.PROXY_API_KEY).get_proxies() + logger.info(f"gathered {len(proxy_list)} proxies") + + # Start Prometheus metrics server + start_http_server(8000) + + highscore_manager = HighscoreWorkerManager( + proxies=proxy_list, + kafka_servers=[app_config.KAFKA_HOST], + kafka_topic=KAFKA_TOPIC_HIGHSCORE, + kafka_group="scraper", + ) + + await highscore_manager.run() + + +# Run the asynchronous main function +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..73e58c4 --- /dev/null +++ b/src/main.py @@ -0,0 +1,32 @@ +import argparse +import asyncio + +from hiscore.main import main as run_hiscore_worker +from runemetrics.main import main as run_runemetrics_worker + + +def main(): + parser = argparse.ArgumentParser(description="Highscore Worker Script") + parser.add_argument( + "--run-hs-worker", + action="store_true", + help="Start the highscore worker.", + ) + parser.add_argument( + "--run-rm-worker", + action="store_true", + help="Start the highscore worker.", + ) + + args = parser.parse_args() + + if args.run_hs_worker: + asyncio.run(run_hiscore_worker()) + elif args.run_rm_worker: + asyncio.run(run_runemetrics_worker()) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/src/main_highscore.py b/src/main_highscore.py deleted file mode 100644 index 003749d..0000000 --- a/src/main_highscore.py +++ /dev/null @@ -1,152 +0,0 @@ -import asyncio -import logging -import traceback -import uuid -from asyncio import Event, Queue - -from aiohttp import ClientSession, ClientTimeout - -from config.config import AppConfig -from modules import _kafka -from modules.api.webshare_api import Webshare -from modules.scraper import HighScoreScraper, Scraper -from modules.validation.player import Player - -logger = logging.getLogger(__name__) - - -async def scrape( - player: dict, scraper: Scraper, session: ClientSession -) -> tuple[str, str]: - error = None - highscore = None - try: - player = Player(**player) - player, highscore = await scraper.lookup(player=player, session=session) - except Exception as error: - error_type = type(error) - logger.error( - { - "name": scraper.worker_name, - "error_type": error_type.__name__, - "error": error, - "player_name": player, - } - ) - tb_str = traceback.format_exc() - logger.error(f"{error}, \n{tb_str}") - return player, highscore, error - - -async def process_messages( - receive_queue: Queue, - send_queue: Queue, - error_queue: Queue, - runemetrics_send_queue: Queue, - shutdown_event: Event, - proxy: str, -): - name = str(uuid.uuid4())[-8:] - scraper = HighScoreScraper(proxy=proxy, worker_name=name) - timeout = ClientTimeout(total=AppConfig().SESSION_TIMEOUT) - - async with ClientSession(timeout=timeout) as session: - while not shutdown_event.is_set(): - if receive_queue.empty(): - await asyncio.sleep(1) - continue - - data = await receive_queue.get() - receive_queue.task_done() - player, highscore, error = await scrape( - player=data, scraper=scraper, session=session - ) - player: Player # can be cleaner probably - - if error is not None: - await error_queue.put(data) - continue - - if highscore is None: - await runemetrics_send_queue.put(player.dict()) - else: - await send_queue.put({"player": player.dict(), "hiscores": highscore}) - logger.info("shutdown") - - -async def get_proxies() -> list: - webshare = Webshare(api_key=AppConfig().PROXY_API_KEY) - proxy_list = await webshare.get_proxies() - logger.info(f"gathered {len(proxy_list)} proxies") - return proxy_list - - -async def main(): - shutdown_event = Event() - consumer = await _kafka.kafka_consumer(topic="player", group="scraper") - producer = await _kafka.kafka_producer() - - receive_queue = Queue(maxsize=500) - send_queue = Queue(maxsize=100) - error_queue = Queue(maxsize=500) - runemetrics_send_queue = Queue(maxsize=100) - - asyncio.create_task( - _kafka.receive_messages( - consumer=consumer, - receive_queue=receive_queue, - shutdown_event=shutdown_event, - ) - ) - - asyncio.create_task( - _kafka.send_messages( - topic="scraper", - producer=producer, - send_queue=send_queue, - shutdown_event=shutdown_event, - ) - ) - - asyncio.create_task( - _kafka.send_messages( - topic="player", - producer=producer, - send_queue=error_queue, - shutdown_event=shutdown_event, - ) - ) - - asyncio.create_task( - _kafka.send_messages( - topic="scraper-runemetrics", - producer=producer, - send_queue=runemetrics_send_queue, - shutdown_event=shutdown_event, - ) - ) - - proxy_list = await get_proxies() - tasks = [] - for proxy in proxy_list: - task = asyncio.create_task( - process_messages( - send_queue=send_queue, - receive_queue=receive_queue, - error_queue=error_queue, - shutdown_event=shutdown_event, - proxy=proxy, - runemetrics_send_queue=runemetrics_send_queue, - ) - ) - tasks.append(task) - # await task for completion (never) - await asyncio.gather(*tasks, return_exceptions=True) - - -if __name__ == "__main__": - try: - loop = asyncio.get_running_loop() - loop.run_until_complete(main()) - except RuntimeError: - asyncio.run(main()) diff --git a/src/main_runemetrics.py b/src/main_runemetrics.py deleted file mode 100644 index 0150333..0000000 --- a/src/main_runemetrics.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import logging -import traceback -import uuid -from asyncio import Event, Queue - -from aiohttp import ClientSession, ClientTimeout - -from config.config import AppConfig -from modules import _kafka -from modules.api.webshare_api import Webshare -from modules.scraper import HighScoreScraper, RuneMetricsScraper, Scraper -from modules.validation.player import Player - -logger = logging.getLogger(__name__) - - -async def scrape( - player: dict, scraper: Scraper, session: ClientSession -) -> tuple[str, str]: - error = None - - try: - player = Player(**player) - player = await scraper.lookup(player=player, session=session) - except Exception as error: - error_type = type(error) - logger.error( - { - "name": scraper.worker_name, - "error_type": error_type.__name__, - "error": error, - "player_name": player, - } - ) - tb_str = traceback.format_exc() - logger.error(f"{error}, \n{tb_str}") - return player, error - - -async def process_messages( - receive_queue: Queue, - send_queue: Queue, - error_queue: Queue, - shutdown_event: Event, - proxy: str, -): - name = str(uuid.uuid4())[-8:] - scraper = RuneMetricsScraper(proxy=proxy, worker_name=name) - timeout = ClientTimeout(total=AppConfig().SESSION_TIMEOUT) - - async with ClientSession(timeout=timeout) as session: - while not shutdown_event.is_set(): - if receive_queue.empty(): - await asyncio.sleep(1) - continue - - data = await receive_queue.get() - receive_queue.task_done() - player, error = await scrape(player=data, scraper=scraper, session=session) - player: Player - - if error is not None: - await error_queue.put(data) - continue - - await send_queue.put({"player": player.dict()}) - logger.info("shutdown") - - -async def get_proxies() -> list: - webshare = Webshare(api_key=AppConfig().PROXY_API_KEY) - proxy_list = await webshare.get_proxies() - logger.info(f"gathered {len(proxy_list)} proxies") - return proxy_list - - -async def main(): - shutdown_event = Event() - consumer = await _kafka.kafka_consumer(topic="scraper-runemetrics", group="scraper") - producer = await _kafka.kafka_producer() - - receive_queue = Queue(maxsize=500) - send_queue = Queue(maxsize=100) - error_queue = Queue(maxsize=500) - - asyncio.create_task( - _kafka.receive_messages( - consumer=consumer, - receive_queue=receive_queue, - shutdown_event=shutdown_event, - ) - ) - - asyncio.create_task( - _kafka.send_messages( - topic="scraper", - producer=producer, - send_queue=send_queue, - shutdown_event=shutdown_event, - ) - ) - - asyncio.create_task( - _kafka.send_messages( - topic="scraper-runemetrics", - producer=producer, - send_queue=error_queue, - shutdown_event=shutdown_event, - ) - ) - proxy_list = await get_proxies() - tasks = [] - for proxy in proxy_list: - task = asyncio.create_task( - process_messages( - send_queue=send_queue, - receive_queue=receive_queue, - error_queue=error_queue, - shutdown_event=shutdown_event, - proxy=proxy, - ) - ) - tasks.append(task) - # await task for completion (never) - await asyncio.gather(*tasks, return_exceptions=True) - - -if __name__ == "__main__": - try: - loop = asyncio.get_running_loop() - loop.run_until_complete(main()) - except RuntimeError: - asyncio.run(main()) diff --git a/src/modules/_kafka.py b/src/modules/_kafka.py deleted file mode 100644 index 40b58d2..0000000 --- a/src/modules/_kafka.py +++ /dev/null @@ -1,186 +0,0 @@ -import asyncio -import functools -import json -import logging -import time -import traceback -from asyncio import Event, Queue - -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer - -from config.config import app_config - -logger = logging.getLogger(__name__) - - -def print_traceback(_, error): - error_type = type(error) - logger.error( - { - "error_type": error_type.__name__, - "error": error, - } - ) - tb_str = traceback.format_exc() - logger.error(f"{error}, \n{tb_str}") - - -def retry(max_retries=3, retry_delay=5, on_retry=None, on_failure=None): - def wrapper(func): - @functools.wraps(func) - async def wrapped(*args, **kwargs): - retry_count = 0 - - while retry_count < max_retries: - try: - return await func(*args, **kwargs) - except Exception as e: - if on_retry: - on_retry(retry_count, e) - - retry_count += 1 - logger.error(f"Error: {e}") - logger.info(f"Retrying ({retry_count}/{max_retries})...") - await asyncio.sleep(retry_delay) # Add a delay before retrying - - if on_failure: - on_failure(retry_count) - - raise RuntimeError(f"Failed after {max_retries} retries") - - return wrapped - - return wrapper - - -def log_speed( - counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15 -) -> tuple[float, int]: - # Calculate the time elapsed since the function started - delta_time = time.time() - start_time - - # Check if the specified interval has not elapsed yet - if delta_time < interval: - # Return the original start time and the current counter value - return start_time, counter - - # Calculate the processing speed (messages per second) - speed = counter / delta_time - - # Log the processing speed and relevant information - log_message = ( - f"{topic=}, qsize={_queue.qsize()}, " - f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" - ) - logger.info(log_message) - - # Return the current time and reset the counter to zero - return time.time(), 0 - - -# async def kafka_consumer_safe(topic: str, group: str, max_retries=3, retry_delay=30): -# retry_count = 0 - -# while retry_count < max_retries: -# try: -# return await kafka_consumer(topic, group) -# except Exception as e: -# retry_count += 1 -# logger.error(f"Error connecting to Kafka: {e}") -# logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...") -# await asyncio.sleep(retry_delay) # Add a delay before retrying - -# raise RuntimeError("Failed to connect to Kafka after multiple retries") - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def kafka_consumer(topic: str, group: str): - logger.info(f"Starting consumer, {topic=}, {group=}, {app_config.KAFKA_HOST=}") - - consumer = AIOKafkaConsumer( - topic, - bootstrap_servers=[app_config.KAFKA_HOST], - group_id=group, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - auto_offset_reset="earliest", - ) - await consumer.start() - logger.info("Started") - return consumer - - -# async def kafka_producer_safe(max_retries=3, retry_delay=30): -# retry_count = 0 - -# while retry_count < max_retries: -# try: -# return await kafka_producer() -# except Exception as e: -# retry_count += 1 -# logger.error(f"Error connecting to Kafka: {e}") -# logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...") -# await asyncio.sleep(retry_delay) # Add a delay before retrying - -# raise RuntimeError("Failed to connect to Kafka after multiple retries") - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def kafka_producer(): - logger.info(f"Starting producer") - - producer = AIOKafkaProducer( - bootstrap_servers=[app_config.KAFKA_HOST], - value_serializer=lambda v: json.dumps(v).encode(), - acks="all", - ) - await producer.start() - logger.info("Started") - return producer - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def receive_messages( - consumer: AIOKafkaConsumer, - receive_queue: Queue, - shutdown_event: Event, - batch_size: int = 200, -): - while not shutdown_event.is_set(): - batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size) - for tp, messages in batch.items(): - logger.info(f"Partition {tp}: {len(messages)} messages") - await asyncio.gather(*[receive_queue.put(m.value) for m in messages]) - logger.info("done") - await consumer.commit() - - logger.info("shutdown") - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def send_messages( - topic: str, - producer: AIOKafkaProducer, - send_queue: Queue, - shutdown_event: Event, -): - start_time = time.time() - messages_sent = 0 - - while not shutdown_event.is_set(): - start_time, messages_sent = log_speed( - counter=messages_sent, - start_time=start_time, - _queue=send_queue, - topic=topic, - ) - if send_queue.empty(): - await asyncio.sleep(1) - continue - - message = await send_queue.get() - await producer.send(topic, value=message) - send_queue.task_done() - - messages_sent += 1 - - logger.info("shutdown") diff --git a/src/modules/api/highscore_api.py b/src/modules/api/highscore_api.py deleted file mode 100644 index b7c69c8..0000000 --- a/src/modules/api/highscore_api.py +++ /dev/null @@ -1,137 +0,0 @@ -import asyncio -import logging -import time - -from aiohttp import ClientResponse, ClientSession - -from modules.validation.player import Player, PlayerDoesNotExistException -from utils.http_exception_handler import InvalidResponse - -logger = logging.getLogger(__name__) - -hiscore_mapper = { - "league_points": "league", - "clue_scrolls_all": "cs_all", - "clue_scrolls_beginner": "cs_beginner", - "clue_scrolls_easy": "cs_easy", - "clue_scrolls_medium": "cs_medium", - "clue_scrolls_hard": "cs_hard", - "clue_scrolls_elite": "cs_elite", - "clue_scrolls_master": "cs_master", - "theatre_of_blood_hard_mode": "theatre_of_blood_hard", - "tombs_of_amascut_expert_mode": "tombs_of_amascut_expert", -} - - -class HighscoreApi: - def __init__(self, proxy: str = None) -> None: - self.proxy = proxy - self.base_url = ( - "https://secure.runescape.com/m=hiscore_oldschool/index_lite.json" - ) - - async def lookup_hiscores(self, player: Player, session: ClientSession) -> dict: - logger.info(f"Performing hiscores lookup on {player.name}") - url = f"{self.base_url}?player={player.name}" - - async with session.get(url, proxy=self.proxy) as response: - data = await self._handle_response_status(response, player) - - if data is None: - await asyncio.sleep(0.1) - data = await self.lookup_hiscores(player, session) - - hiscore = await self._parse_hiscores(data) - hiscore["Player_id"] = player.id - hiscore["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) - return hiscore - - async def _handle_response_status( - self, response: ClientResponse, player: Player - ) -> dict: - status = response.status - - if response.history and any(resp.status == 302 for resp in response.history): - logger.warning( - f"Redirection occured: {response.url} - {response.history[0].url}" - ) - return None - - basic_error = ( - f"status code {status}.\n" - f"URL: {response.url}\n" - f"Header: {response.headers}\n" - ) - - match status: - # OK - case 200: - return await response.json() - # OK - case 404: - logger.debug(f"{player.name} does not exist on hiscores.") - raise PlayerDoesNotExistException( - f"Player {player.dict()} does not exist" - ) - # NOK, but known - case 429: - logger.warning(basic_error) - await asyncio.sleep(15) - case s if 500 <= s < 600: - body = await response.text() - logger.warning(basic_error) - if s not in [503]: - logger.warning(f"Body:\n{body}\n") - await asyncio.sleep(5) - case 403: - logger.warning(status) - await asyncio.sleep(5) - # NOK - case _: - body = await response.text() - logger.error( - f"Unhandled status code {status}.\n" - f"URL: {response.url}\n" - f"Header: {response.headers}\n" - f"Body: {body}" - ) - raise InvalidResponse() - - def _parse_hiscore_name(self, name: str) -> str: - name = name.lower() - name = name.replace("'", "") - name = name.replace(" - ", " ") - name = name.replace("-", "_") - name = name.replace(":", "") - name = name.replace("(", "").replace(")", "") - name = name.replace(" ", "_") - # replaces "name" with its corresponding abbreviation from "hiscore_mapper" dictionary, - # if one exists, or keeps the original name if it does not - name = hiscore_mapper.get(name, name) - return name - - def _parse_hiscore_stat(self, stat: int) -> int: - stat = 0 if stat == -1 else stat - return stat - - async def _parse_hiscores(self, hiscore: dict) -> dict: - # Extract skill data from hiscore dictionary and create a new dictionary - skill_stats = { - self._parse_hiscore_name(s["name"]): self._parse_hiscore_stat(s["xp"]) - for s in hiscore.get("skills") - if s["name"] != "Overall" - } - - # Calculate the sum of all skills and add it to the skills dictionary - skill_stats["total"] = sum( - [v for k, v in skill_stats.items() if k not in ("total", "overall")] - ) - - # Extract activity data from hiscore dictionary and create a new dictionary - activity_stats = { - self._parse_hiscore_name(a["name"]): self._parse_hiscore_stat(a["score"]) - for a in hiscore.get("activities") - } - - # Merge the skills and activities dictionaries and return the result - return skill_stats | activity_stats diff --git a/src/modules/api/runemetrics_api.py b/src/modules/api/runemetrics_api.py deleted file mode 100644 index 4c5444b..0000000 --- a/src/modules/api/runemetrics_api.py +++ /dev/null @@ -1,98 +0,0 @@ -import asyncio -import logging -import time - -from aiohttp import ClientResponse, ClientSession - -from modules.validation.player import Player -from utils.http_exception_handler import InvalidResponse - -logger = logging.getLogger(__name__) - - -class RuneMetricsApi: - def __init__(self, proxy: str = None) -> None: - self.proxy = proxy - self.base_url = "https://apps.runescape.com/runemetrics/profile/profile" - - async def lookup_runemetrics(self, player: Player, session: ClientSession) -> dict: - """ - Performs a RuneMetrics lookup on the given player. - - :param player: a dictionary containing the player's name and id - :return: a dictionary containing the player's RuneMetrics data - """ - player_name = player.name - base_url = "https://apps.runescape.com/runemetrics/profile/profile" - url = f"{base_url}?user={player_name}" - - async with session.get(url, proxy=self.proxy) as response: - data: dict = await self._handle_response_status(response, player) - - if data is None: - await asyncio.sleep(0.1) - return await self.lookup_runemetrics(player, session) - - logger.info(f"found {player_name} on runemetrics") - - match data.get("error"): - case "NO_PROFILE": - # username is not associated to an account - player.label_jagex = 1 - case "NOT_A_MEMBER": - # account is perm banned - player.label_jagex = 2 - case "PROFILE_PRIVATE": - # runemetrics is set to private. either they're too low level or they're banned. - player.label_jagex = 3 - case _: - # account is active, probably just too low stats for hiscores - player.label_jagex = 0 - # API assigns this too, but just being safe - player.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) - return player - - async def _handle_response_status( - self, response: ClientResponse, player: Player - ) -> dict: - status = response.status - - if response.history and any(resp.status == 302 for resp in response.history): - logger.warning( - f"Redirection occured: {response.url} - {response.history[0].url}" - ) - return None - - basic_error = ( - f"status code {status}.\n" - f"URL: {response.url}\n" - f"Header: {response.headers}\n" - f"Player: {player.name}\n" - ) - - match status: - # OK - case 200: - return await response.json() - # NOK, but known - case 429: - logger.warning(basic_error) - await asyncio.sleep(15) - case s if 500 <= s < 600: - body = await response.text() - logger.warning(basic_error) - if s not in [503]: - logger.warning(f"Body:\n{body}\n") - await asyncio.sleep(5) - case 403: - logger.warning(status) - await asyncio.sleep(5) - # NOK - case _: - body = await response.text() - logger.error( - f"Unhandled status code {status}.\n" - f"Header: {response.headers}\n" - f"Body: {body}" - ) - raise InvalidResponse() diff --git a/src/modules/gracefull_shutdown.py b/src/modules/gracefull_shutdown.py deleted file mode 100644 index 2e69ef2..0000000 --- a/src/modules/gracefull_shutdown.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -import logging -import signal - -logger = logging.getLogger(__name__) - - -class GracefulShutdown: - def __init__(self, shutdown_event: asyncio.Event, shutdown_sequence): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - self.shutdown_event = shutdown_event - self.shutdown_sequence = shutdown_sequence - - def exit_gracefully(self, signum, frame): - logger.info("shutdown") - self.shutdown_event.set() - - loop = asyncio.get_event_loop() - loop.create_task(self.shutdown_sequence) diff --git a/src/modules/scraper.py b/src/modules/scraper.py deleted file mode 100644 index 643ddc5..0000000 --- a/src/modules/scraper.py +++ /dev/null @@ -1,142 +0,0 @@ -import asyncio -import logging -import time -from collections import deque -from typing import Union - -from aiohttp import ClientSession - -from modules.api.highscore_api import HighscoreApi -from modules.api.runemetrics_api import RuneMetricsApi -from modules.validation.player import Player, PlayerDoesNotExistException - -logger = logging.getLogger(__name__) - - -class Scraper: - def __init__( - self, proxy: str, worker_name: str, calls_per_minute: int = 60 - ) -> None: - self.proxy = proxy - self.worker_name = worker_name - self.history = deque(maxlen=calls_per_minute) - self.highscore_api = HighscoreApi(proxy=proxy) - self.runemetrics_api = RuneMetricsApi(proxy) - self.sleeping = False - - async def rate_limit(self): - """ - Rate limits the scraper to 60 calls a minute. - """ - self.history.append(int(time.time())) - maxlen = self.history.maxlen - MINUTE = 60 - if not len(self.history) == maxlen: - return - - head = self.history[0] - tail = self.history[-1] - span = tail - head - - if span < MINUTE: - sleep = MINUTE - span - if sleep % 10 == 0: - logger.warning( - f"{self.worker_name} - Rate limit reached, sleeping {sleep} seconds" - ) - self.sleeping = True - await asyncio.sleep(sleep) - self.sleeping = False - return - - async def lookup( - self, player: Player, session: ClientSession - ) -> Union[Player, dict]: - ... - - # async def lookup( - # self, player: Player, session: ClientSession - # ) -> Union[Player, dict]: - # await self.rate_limit() - # highscore = None - # try: - # highscore = await self.highscore_api.lookup_hiscores( - # player=player, session=session - # ) - # player.possible_ban = 0 - # player.confirmed_ban = 0 - # player.label_jagex = 0 - # except PlayerDoesNotExistException: - # player.possible_ban = 1 - # player.confirmed_player = 0 - # player = await self.runemetrics_api.lookup_runemetrics( - # player=player, session=session - # ) - - # player.updated_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) - # return player, highscore - - # async def lookup_hiscores( - # self, player: Player, session: ClientSession - # ) -> Union[Player, dict]: - # await self.rate_limit() - # highscore = None - # try: - # highscore = await self.highscore_api.lookup_hiscores( - # player=player, session=session - # ) - # player.possible_ban = 0 - # player.confirmed_ban = 0 - # player.label_jagex = 0 - # except PlayerDoesNotExistException: - # logger.warn(msg=f"{player.name} does not exist") - - # player.updated_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) - # return player, highscore - - # async def lookup_runemetrics(self, player: Player, session: ClientSession) -> dict: - # await self.rate_limit() - # return await self.runemetrics_api.lookup_runemetrics( - # player=player, session=session - # ) - - -class HighScoreScraper(Scraper): - def __init__( - self, proxy: str, worker_name: str, calls_per_minute: int = 60 - ) -> None: - super().__init__(proxy, worker_name, calls_per_minute) - self.highscore_api = HighscoreApi(proxy=proxy) - - async def lookup( - self, player: Player, session: ClientSession - ) -> Union[Player, dict]: - await self.rate_limit() - highscore = None - try: - highscore = await self.highscore_api.lookup_hiscores( - player=player, session=session - ) - player.possible_ban = 0 - player.confirmed_ban = 0 - player.label_jagex = 0 - except PlayerDoesNotExistException: - player.possible_ban = 1 - player.confirmed_player = 0 - - player.updated_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) - return player, highscore - - -class RuneMetricsScraper(Scraper): - def __init__( - self, proxy: str, worker_name: str, calls_per_minute: int = 60 - ) -> None: - super().__init__(proxy, worker_name, calls_per_minute) - self.runemetrics_api = RuneMetricsApi(proxy) - - async def lookup(self, player: Player, session: ClientSession) -> dict: - await self.rate_limit() - return await self.runemetrics_api.lookup_runemetrics( - player=player, session=session - ) diff --git a/src/modules/validation/player.py b/src/modules/validation/player.py deleted file mode 100644 index 69dcde7..0000000 --- a/src/modules/validation/player.py +++ /dev/null @@ -1,17 +0,0 @@ -from pydantic import BaseModel - - -class PlayerDoesNotExistException(Exception): - pass - - -class Player(BaseModel): - id: int - name: str - created_at: str - updated_at: str | None - possible_ban: int - confirmed_ban: int - confirmed_player: int - label_id: int - label_jagex: int diff --git a/src/runemetrics/__init__.py b/src/runemetrics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/runemetrics/api.py b/src/runemetrics/api.py new file mode 100644 index 0000000..681a455 --- /dev/null +++ b/src/runemetrics/api.py @@ -0,0 +1,64 @@ +import logging + +from aiohttp import ClientSession +from osrs.exceptions import Undefined, UnexpectedRedirection +from osrs.utils.ratelimiter import RateLimiter + +logger = logging.getLogger(__name__) + + +class RuneMetrics: + BASE_URL = "https://apps.runescape.com/runemetrics" + + def __init__( + self, proxy: str = "", rate_limiter: RateLimiter = RateLimiter() + ) -> None: + self.proxy = proxy + self.rate_limiter = rate_limiter + + async def _make_request(self, session: ClientSession, url: str, params: dict): + async with session.get(url, proxy=self.proxy, params=params) as response: + # when the HS are down it will redirect to the main page. + # after redirction it will return a 200, so we must check for redirection first + if response.history and any(r.status == 302 for r in response.history): + raise UnexpectedRedirection( + f"Redirection occured: {response.url} - {response.history[0].url}" + ) + elif response.status != 200: + # raises ClientResponseError + response.raise_for_status() + raise Undefined() + return await response.json() + + async def get_profile( + self, session: ClientSession, player_name: str, activities: int = 0 + ) -> dict: + """ + activities, number of activities for that player + """ + await self.rate_limiter.check() + + params = {"user": player_name, "activities": activities} + params = {k: v for k, v in params.items() if v} + url = f"{self.BASE_URL}/profile/profile" + data = await self._make_request(session=session, url=url, params=params) + return data + + async def get_monthly_xp( + self, session: ClientSession, player_name: str, skill_id: int | None = None + ) -> dict: + await self.rate_limiter.check() + + params = {"searchName": player_name, "skillid": skill_id} + params = {k: v for k, v in params.items() if v} + + url = f"{self.BASE_URL}/xp-monthly" + data = await self._make_request(session=session, url=url, params=params) + return data + + async def get_quests(self, session: ClientSession, player_name: str) -> dict: + await self.rate_limiter.check() + params = {"user": player_name} + url = f"{self.BASE_URL}/quests" + data = await self._make_request(session=session, url=url, params=params) + return data diff --git a/src/runemetrics/main.py b/src/runemetrics/main.py new file mode 100644 index 0000000..3de3056 --- /dev/null +++ b/src/runemetrics/main.py @@ -0,0 +1,240 @@ +import asyncio +import json +import logging +import time + +from aiohttp import ClientResponseError, ClientSession +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from osrs.async_api.osrs.hiscores import RateLimiter +from osrs.exceptions import Undefined, UnexpectedRedirection +from prometheus_client import Counter, Histogram, start_http_server +from pydantic import BaseModel + +from config.config import app_config +from modules.api.webshare_api import Webshare +from runemetrics.api import RuneMetrics + +logger = logging.getLogger(__name__) + +KAFKA_TOPIC_RUNEMETRICS = "scraper-runemetrics" + +# Prometheus metrics +success_counter = Counter( + name="rune_metrics_success", + documentation="Successful RuneMetrics requests", + labelnames=["proxy"], +) +error_counter = Counter( + name="rune_metrics_errors", + documentation="Errors in RuneMetrics requests", + labelnames=["proxy"], +) +latency_histogram = Histogram( + name="rune_metrics_latency", + documentation="Latency of RuneMetrics requests", + labelnames=["proxy"], +) + + +class Player(BaseModel): + id: int + name: str + created_at: str | None + updated_at: str | None + possible_ban: int + confirmed_ban: int + confirmed_player: int + label_id: int + label_jagex: int + + +class RuneMetricsWorkerManager: + def __init__(self, proxies, kafka_servers, kafka_topic, kafka_group): + self.proxies = proxies + self.kafka_servers = kafka_servers + self.kafka_topic = kafka_topic + self.kafka_group = kafka_group + self.player_queue = asyncio.Queue(maxsize=10) + self.semaphore = asyncio.Semaphore(len(proxies)) + self.kafka_producer: AIOKafkaProducer = None + self.kafka_consumer: AIOKafkaConsumer = None + + async def setup_kafka_consumer(self): + if self.kafka_consumer: + logger.warning("Consumer already started") + return + + self.kafka_consumer = AIOKafkaConsumer( + self.kafka_topic, + bootstrap_servers=self.kafka_servers, + group_id=self.kafka_group, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + auto_offset_reset="earliest", + ) + await self.kafka_consumer.start() + + async def setup_kafka_producer(self): + if self.kafka_producer: + logger.warning("Producer already started") + return + + self.kafka_producer = AIOKafkaProducer( + bootstrap_servers=self.kafka_servers, + ) + await self.kafka_producer.start() + + async def produce_message(self, topic: str, msg: dict): + """Produce player data to Kafka.""" + if not self.kafka_producer: + raise Exception("Kafka Producer is None") + + if not isinstance(msg, dict): + raise Exception(f"Expected type dict, got {type(msg)}") + + await self.kafka_producer.send_and_wait( + topic=topic, + value=msg, + ) + + async def produce_player(self, topic: str, player: Player): + """Produce player data to Kafka.""" + topics = ["scraper-runemetrics"] + if topic not in topics: + err = f"Unsupported Topic, received: {topic} expected value in {topics=}" + raise Exception(err) + + if not isinstance(player, Player): + raise Exception(f"Expected type Player, got {type(player)}") + + await self.produce_message( + topic=topic, + value=player.model_dump(), + ) + + async def consume_player(self): + """Consumes player data from Kafka.""" + if not self.kafka_consumer: + raise Exception("Kafka Consumer is None") + + async for msg in self.kafka_consumer: + player = Player(**msg.value) + await self.player_queue.put(player) + + async def fetch_player_profile( + self, session: ClientSession, player: Player, metrics_instance: RuneMetrics + ): + username = player.name + proxy = session._proxy + try: + start_time = time.time() + data: dict = await metrics_instance.get_profile(player_name=username) + latency = time.time() - start_time + + # process logic + player.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + match data.get("error"): + # username is not associated to an account + case "NO_PROFILE": + player.label_jagex = 1 + # account is perm banned + case "NOT_A_MEMBER": + player.label_jagex = 2 + # runemetrics is set to private. either they're too low level or they're banned. + case "PROFILE_PRIVATE": + player.label_jagex = 3 + # account is active, probably just too low stats for hiscores + case _: + player.label_jagex = 0 + + msg = {"player": player.model_dump()} + await self.produce_message(topic="scraper", msg=msg) + + # Increment success counter + success_counter.labels(proxy=proxy).inc() + + # Record latency, labeled by proxy + latency_histogram.labels(proxy=proxy).observe(latency) + + except UnexpectedRedirection as e: + error_counter.labels(proxy=proxy).inc() + logger.error(f"Highscore page down for {username}: {e}") + await self.produce_player(topic=KAFKA_TOPIC_RUNEMETRICS, player=player) + await asyncio.sleep(120) + except Undefined as e: + error_counter.labels(proxy=proxy).inc() + await self.produce_player(topic=KAFKA_TOPIC_RUNEMETRICS, player=player) + logger.error(e) + raise Exception(e) # Fail loud + + except ClientResponseError as e: + error_counter.labels(proxy=proxy).inc() + await self.produce_player(topic=KAFKA_TOPIC_RUNEMETRICS, player=player) + logger.error(e) + raise Exception(e) # Fail loud + finally: + return + + async def worker(self, proxy): + limiter = RateLimiter(calls_per_interval=60, interval=60) + hiscore_instance = RuneMetrics(proxy=proxy, rate_limiter=limiter) + + async with ClientSession() as session: + # Store proxy for latency tracking + session._proxy = proxy.split("@")[1] + while True: + player = await self.player_queue.get() + self.player_queue.task_done() + # TODO: Stop signal + if player is None: + break + + async with self.semaphore: + await self.fetch_player_profile(session, player, hiscore_instance) + + async def start_workers(self): + self.tasks = [asyncio.create_task(self.worker(p)) for p in self.proxies] + + async def stop_workers(self): + await self.kafka_consumer.stop() + self.kafka_consumer = None + + while any([t for t in self.tasks if t.done()]): + await self.player_queue.put(None) + l_tasks = len([t for t in self.tasks if t.done()]) + logger.info(f"Shutdown: running tasks: {l_tasks}") + + async def run(self): + await self.setup_kafka_consumer() + await self.setup_kafka_producer() + + try: + await self.start_workers() + await self.consume_player() + await self.stop_workers() + finally: + if self.kafka_consumer: + await self.kafka_consumer.stop() + if self.kafka_producer: + await self.kafka_producer.stop() + + +async def main(): + proxy_list = await Webshare(api_key=app_config.PROXY_API_KEY).get_proxies() + logger.info(f"gathered {len(proxy_list)} proxies") + + # Start Prometheus metrics server + start_http_server(8000) + + runemetrics_manager = RuneMetricsWorkerManager( + proxies=proxy_list, + kafka_servers=[app_config.KAFKA_HOST], + kafka_topic=KAFKA_TOPIC_RUNEMETRICS, + kafka_group="scraper", + ) + + await runemetrics_manager.run() + + +# Run the asynchronous main function +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/utils/http_exception_handler.py b/src/utils/http_exception_handler.py deleted file mode 100644 index c2a4bab..0000000 --- a/src/utils/http_exception_handler.py +++ /dev/null @@ -1,44 +0,0 @@ -import asyncio -import logging -import traceback - -from aiohttp.client_exceptions import ( - ClientConnectorError, - ClientOSError, - ContentTypeError, - ServerDisconnectedError, - ServerTimeoutError, -) - -from modules.validation.player import PlayerDoesNotExistException - - -class InvalidResponse(Exception): - pass - - -def http_exception_handler(func): - async def wrapper(*args, **kwargs): - logger = logging.getLogger(func.__name__) - try: - result = await func(*args, **kwargs) - return result - except ( - ServerTimeoutError, - ServerDisconnectedError, - ClientConnectorError, - ContentTypeError, - ClientOSError, - ) as e: - logger.error(f"{e}") - raise InvalidResponse(f"{e}") - except PlayerDoesNotExistException: - raise PlayerDoesNotExistException() - except Exception as e: - # get the stack trace as a string - tb_str = traceback.format_exc() - logger.error(f"{e}, \n{tb_str}") - await asyncio.sleep(10) - return None - - return wrapper diff --git a/src/utils/timer.py b/src/utils/timer.py deleted file mode 100644 index 99c6a3a..0000000 --- a/src/utils/timer.py +++ /dev/null @@ -1,16 +0,0 @@ -import logging -import time - -logger = logging.getLogger(__name__) - - -def timer(func): - async def wrapper(*args, **kwargs): - start_time = time.perf_counter() - result = await func(*args, **kwargs) - end_time = time.perf_counter() - run_time = end_time - start_time - logger.debug(f"{func.__name__} took {run_time:.4f} seconds") - return result - - return wrapper