Skip to content

Commit

Permalink
Add echo flag to PostgreSQL engine and implement CrawledItem model
Browse files Browse the repository at this point in the history
  • Loading branch information
AndPuQing committed Jan 21, 2024
1 parent 2b7108b commit ec85068
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 14 deletions.
1 change: 1 addition & 0 deletions backend/app/app/db/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

engine = create_engine(
f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@bemore-db/{settings.POSTGRES_DB}",
echo=True,
)
14 changes: 12 additions & 2 deletions backend/app/app/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Contents of JWT token
from datetime import datetime
from typing import Optional, Union

from pydantic import BaseModel, EmailStr, HttpUrl
Expand Down Expand Up @@ -55,7 +56,7 @@ class UserOut(UserBase):
# Shared properties
class ItemBase(SQLModel):
title: str
description: str
abstract: str
keywords: Union[list[str], None] = Field(
default=None,
sa_column=Column(JSON),
Expand All @@ -71,7 +72,7 @@ class ItemCreate(ItemBase):
# Properties to receive on item update
class ItemUpdate(ItemBase):
title: Optional[str] = None
description: Optional[str] = None
abstract: Optional[str] = None
keywords: Optional[list[str]] = None
raw_url: Optional[HttpUrl] = None
is_hidden: Optional[bool] = None
Expand All @@ -89,6 +90,15 @@ class ItemOut(ItemBase):
id: int


class CrawledItem(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
raw_url: str = Field(nullable=False)
last_crawled: datetime = Field(
default_factory=datetime.utcnow,
nullable=False,
)


class TokenPayload(BaseModel):
sub: Union[int, None] = None

Expand Down
46 changes: 42 additions & 4 deletions backend/app/app/source/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,23 @@
import requests
from celery import Task
from scrapy.http import HtmlResponse
from sqlmodel import Session

from app.models import CrawledItem, Item


class PaperRequestsTask(Task):
url: str
ignore_result: bool = True

@property
def db(self):
"""
Lazy loading of database connection.
"""
from app.db.engine import engine

return Session(engine)

@staticmethod
def parse_urls(response: HtmlResponse) -> list[str]:
Expand All @@ -15,18 +28,21 @@ def parse_urls(response: HtmlResponse) -> list[str]:

@classmethod
def get_urls(cls) -> list[str]:
response = cls.requestx(cls.url)
response = cls._request(cls.url)
if response is None:
return []
return cls.parse_urls(response)

@staticmethod
def parse(response: HtmlResponse) -> dict[str, str]:
# you should return dict with fields:
# title, abstract, url
raise NotImplementedError

@staticmethod
def requestx(url: str) -> HtmlResponse | None:
def _request(
url: str,
) -> HtmlResponse | None: # On the Take class have same method(request)
try:
response = requests.get(url)
response.raise_for_status()
Expand All @@ -35,11 +51,33 @@ def requestx(url: str) -> HtmlResponse | None:
return
return HtmlResponse(url=url, body=response.content, encoding="utf-8")

def save(self, data: list[dict[str, str]]) -> None:
with self.db as db:
objs = [
CrawledItem(
raw_url=item["url"],
)
for item in data
]
db.add_all(objs)
db.commit()

objs = [
Item(
title=item["title"],
abstract=item["abstract"],
raw_url=item["url"],
)
for item in data
]
db.add_all(objs)
db.commit()

def run(self, urls: list[str]):
results = []
for url in urls:
response = PaperRequestsTask.requestx(url)
response = PaperRequestsTask._request(url)
if response is None:
continue
results.append(self.parse(response))
return results
self.save(results)
56 changes: 49 additions & 7 deletions backend/app/app/worker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,60 @@
import logging
from typing import Union

from celery import Task
from sqlmodel import Session, select

from app.core.celery_app import celery_app
from app.core.config import settings
from app.models import CrawledItem
from app.source import Nips

celery_app.register_task(Nips())


def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx : min(ndx + n, l)]
def batch(iterable: Union[set[str], list[str]], n: int = 1):
"""
Batch generator.
:param iterable: iterable object.
:param n: batch size.
"""
iterable = list(iterable)
length = len(iterable)
for ndx in range(0, length, n):
yield iterable[ndx : min(ndx + n, length)]


class DatabaseTask(Task):
@property
def db(self) -> Session:
"""
Lazy loading of database connection.
"""
from app.db.engine import engine

return Session(engine)


@celery_app.task(
acks_late=True,
base=DatabaseTask,
bind=True,
ignore_result=True,
)
def test_celery_worker(self: DatabaseTask, word: str) -> None:
urls = set(Nips.get_urls())

# remove duplicates from db
with self.db as db:
items = set(db.exec(select(CrawledItem.raw_url)).all())
dup_urls = urls - items

# Calculate cache hit rate
cache_hit_rate = (len(urls) - len(dup_urls)) / len(urls) if urls else 0

# Log cache hit rate
logging.info(f"Cache hit rate: {cache_hit_rate * 100:.2f}%")

@celery_app.task(acks_late=True)
def test_celery_worker(word: str) -> None:
urls = Nips.get_urls()
for url in batch(urls, settings.REQUESTS_BATCH_SIZE):
celery_app.send_task("Nips", kwargs={"urls": url})
3 changes: 2 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ services:
PGADMIN_CONFIG_SERVER_MODE: 'False'
networks:
- default

queue:
container_name: bemore-queue
image: rabbitmq:3-management
# Using the below image instead is required to enable the "Broker" tab in the flower UI:
# image: rabbitmq:3-management
#
# You also have to change the flower command
ports:
- "15672:15672"

flower:
container_name: bemore-flower
Expand Down

0 comments on commit ec85068

Please sign in to comment.