Skip to content

Commit

Permalink
Refactor Nips class and add batch processing in test_celery_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
AndPuQing committed Jan 20, 2024
1 parent a91a807 commit 991bbc0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
7 changes: 3 additions & 4 deletions backend/app/app/source/NIPS.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@


class Nips(PaperRequestsTask):
def __init__(self):
url: str = "https://nips.cc/Conferences/2023/Schedule?type=Poster"
super().__init__(url)
url: str = "https://nips.cc/Conferences/2023/Schedule?type=Poster"
name: str = "Nips"

@staticmethod
def get_urls(response: HtmlResponse) -> list[str]:
def parse_urls(response: HtmlResponse) -> list[str]:
poster_ids = response.css(".maincard::attr(id)").getall()
urls = [
f"https://nips.cc/Conferences/2023/Schedule?showEvent={poster_id.replace('maincard_', '')}"
Expand Down
4 changes: 4 additions & 0 deletions backend/app/app/source/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from app.source.base import PaperRequestsTask
from app.source.NIPS import Nips

__all__ = ["PaperRequestsTask", "Nips"]
19 changes: 11 additions & 8 deletions backend/app/app/source/base.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
import logging
from abc import abstractmethod

import requests
from celery import Task
from scrapy.http import HtmlResponse


class PaperRequestsTask(Task):
def __init__(self, url):
self.url = url
url: str

@abstractmethod
@staticmethod
def get_urls(response: HtmlResponse) -> list[str]:
def parse_urls(response: HtmlResponse) -> list[str]:
# you should return list of absolute urls
raise NotImplementedError

@classmethod
def get_urls(cls) -> list[str]:
response = cls.requestx(cls.url)
if response is None:
return []
return cls.parse_urls(response)

@staticmethod
@abstractmethod
def parse(response: HtmlResponse) -> dict[str, str]:
# you should return dict with fields:
raise NotImplementedError

@staticmethod
def request(url: str) -> HtmlResponse | None:
def requestx(url: str) -> HtmlResponse | None:
try:
response = requests.get(url)
response.raise_for_status()
Expand All @@ -35,7 +38,7 @@ def request(url: str) -> HtmlResponse | None:
def run(self, urls: list[str]):
results = []
for url in urls:
response = PaperRequestsTask.request(url)
response = PaperRequestsTask.requestx(url)
if response is None:
continue
results.append(self.parse(response))
Expand Down
17 changes: 11 additions & 6 deletions backend/app/app/worker.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import logging

from app.core.celery_app import celery_app
from app.core.config import settings
from app.source import Nips

celery_app.register_task(Nips())


def run_paper_requests_task(source: str):
pass
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx : min(ndx + n, l)]


@celery_app.task(acks_late=True)
def test_celery_worker(word: str) -> None:
logging.info("Celery worker is working")
logging.info(f"DONE: {word}")
urls = Nips.get_urls()
for url in batch(urls, settings.REQUESTS_BATCH_SIZE):
celery_app.send_task("Nips", kwargs={"urls": url})

0 comments on commit 991bbc0

Please sign in to comment.