Skip to content

Commit

Permalink
Fixed #10: Request Task to farm
Browse files Browse the repository at this point in the history
- Updated ArchiveConfig to use single language (we'll only alow single-lang ZIMs for now)
- New zimfarm module for ZF API communication
- New endpoint /{project_id}/archives/{archive_id}/request to request a ZIM to be created by zimfarm.
  - generates collection.json based on files in project
  - uploads collection.json to S3
  - calls zimfarm to create a dedicated, manual, schedule (passing a webhook url)
  - calls zimfarm to request a task for this schedule
  - calls zimfarm to delete schedule
  - records ZF task_id and status change in DB
- New email sending capability via Mailgun API
- DB Archive Model has new completed_on property
- New endpoint /{project_id}/archives/{archive_id}/hook for the zimfarm to inform about status changes
 - records update in DB
 - sends notification emails with Templates
- Lots of new configuration points (via environs)
- Reorganized constants by feature
- Reorganized some utils functions into utils modules
- Updated alembic post-write hooks to use ruff instead of former isort
  • Loading branch information
rgaudin committed Jun 7, 2024
1 parent b34d1b4 commit 641a787
Show file tree
Hide file tree
Showing 18 changed files with 859 additions and 143 deletions.
23 changes: 11 additions & 12 deletions backend/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,17 @@ sqlalchemy.url = driver://user:pass@localhost/dbname
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
hooks = black isort
black.type = console_scripts
black.entrypoint = black
black.options = REVISION_SCRIPT_FILENAME
isort.type = console_scripts
isort.entrypoint = isort
isort.options = --profile black REVISION_SCRIPT_FILENAME
hooks = ruff, ruff_format

# lint with attempts to fix using ruff
ruff.type = exec
ruff.executable = ruff
ruff.options = check --fix REVISION_SCRIPT_FILENAME

# format using ruff
ruff_format.type = exec
ruff_format.executable = ruff
ruff_format.options = format REVISION_SCRIPT_FILENAME


# Logging configuration
Expand Down
104 changes: 83 additions & 21 deletions backend/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import tempfile
import uuid
from dataclasses import dataclass, field
from pathlib import Path

Expand All @@ -21,45 +22,106 @@ class BackendConf:
Backend configuration, read from environment variables and set default values.
"""

logger: logging.Logger = field(init=False)
# Configuration
project_expire_after: datetime.timedelta = datetime.timedelta(days=7)
project_quota: int = 0
chunk_size: int = 1024 # reading/writing received files
illustration_quota: int = 0
api_version_prefix: str = "/v1" # our API

# Database
postgres_uri: str = os.getenv("POSTGRES_URI") or "nodb"

# Scheduler process
redis_uri: str = os.getenv("REDIS_URI") or "redis://localhost:6379/0"
channel_name: str = os.getenv("CHANNEL_NAME") or "s3_upload"

# Mandatory configurations
postgres_uri = os.getenv("POSTGRES_URI", "nodb")
s3_url_with_credentials = os.getenv("S3_URL_WITH_CREDENTIALS")
private_salt = os.getenv("PRIVATE_SALT")
# Transient (on host disk) Storage
transient_storage_path: Path = Path()

# Optional configuration.
s3_max_tries = int(os.getenv("S3_MAX_TRIES", "3"))
s3_retry_wait = humanfriendly.parse_timespan(os.getenv("S3_RETRY_TIMES", "10s"))
s3_deletion_delay = datetime.timedelta(
# S3 Storage
s3_url_with_credentials: str = os.getenv("S3_URL_WITH_CREDENTIALS") or ""
s3_max_tries: int = int(os.getenv("S3_MAX_TRIES", "3"))
s3_retry_wait: int = int(
humanfriendly.parse_timespan(os.getenv("S3_RETRY_TIMES") or "10s")
)
s3_deletion_delay: datetime.timedelta = datetime.timedelta(
hours=int(os.getenv("S3_REMOVE_DELETEDUPLOADING_AFTER_HOURS", "12"))
)
transient_storage_path = Path(
os.getenv("TRANSIENT_STORAGE_PATH", tempfile.gettempdir())
).resolve()
redis_uri = os.getenv("REDIS_URI", "redis://localhost:6379/0")
channel_name = os.getenv("CHANNEL_NAME", "s3_upload")
private_salt = os.getenv(
"PRIVATE_SALT", uuid.uuid4().hex
) # used to make S3 keys unguessable

# Cookies
cookie_domain = os.getenv("COOKIE_DOMAIN", None)
cookie_expiration_days = int(os.getenv("COOKIE_EXPIRATION_DAYS", "30"))
project_quota = humanfriendly.parse_size(os.getenv("PROJECT_QUOTA", "100MB"))
chunk_size = humanfriendly.parse_size(os.getenv("CHUNK_SIZE", "2MiB"))
illustration_quota = humanfriendly.parse_size(
os.getenv("ILLUSTRATION_QUOTA", "2MiB")
authentication_cookie_name: str = "user_id"

# Deployment
public_url: str = os.getenv("PUBLIC_URL") or "http://localhost"
download_url: str = (
os.getenv("DOWNLOAD_URL")
or "https://s3.us-west-1.wasabisys.com/org-kiwix-zimit/zim"
)
allowed_origins = os.getenv(
"ALLOWED_ORIGINS",
"http://localhost",
).split("|")

authentication_cookie_name: str = "user_id"
api_version_prefix = "/v1"
project_expire_after = datetime.timedelta(days=7)
# Zimfarm (3rd party API creating ZIMs and calling back with feedback)
zimfarm_api_url: str = (
os.getenv("ZIMFARM_API_URL") or "https://api.farm.zimit.kiwix.org/v1"
)
zimfarm_username: str = os.getenv("ZIMFARM_API_USERNAME") or ""
zimfarm_password: str = os.getenv("ZIMFARM_API_PASSWORD") or ""
zimfarm_nautilus_image: str = (
os.getenv("ZIMFARM_NAUTILUS_IMAGE") or "ghcr.io/openzim/nautilus:latest"
)
zimfarm_task_cpu: int = int(os.getenv("ZIMFARM_TASK_CPU") or "3")
zimfarm_task_memory: int = 0
zimfarm_task_disk: int = 0
zimfarm_callback_base_url = os.getenv("ZIMFARM_CALLBACK_BASE_URL", "")
zimfarm_callback_token = os.getenv("ZIMFARM_CALLBACK_TOKEN", uuid.uuid4().hex)
zimfarm_task_worker: str = os.getenv("ZIMFARM_TASK_WORKDER") or "-"
zimfarm_request_timeout_sec: int = 10

# Mailgun (3rd party API to send emails)
mailgun_api_url: str = os.getenv("MAILGUN_API_URL") or ""
mailgun_api_key: str = os.getenv("MAILGUN_API_KEY") or ""
mailgun_from: str = os.getenv("MAILGUN_FROM") or "Nautilus ZIM"
mailgun_request_timeout_sec: int = 10

logger: logging.Logger = field(init=False)

def __post_init__(self):
self.logger = logging.getLogger(Path(__file__).parent.name)
self.transient_storage_path.mkdir(exist_ok=True)
self.job_retry = Retry(max=self.s3_max_tries, interval=int(self.s3_retry_wait))

self.transient_storage_path = Path(
os.getenv("TRANSIENT_STORAGE_PATH") or tempfile.gettempdir()
).resolve()

self.project_quota = humanfriendly.parse_size(
os.getenv("PROJECT_QUOTA") or "100MB"
)

self.chunk_size = humanfriendly.parse_size(os.getenv("CHUNK_SIZE", "2MiB"))

self.illustration_quota = humanfriendly.parse_size(
os.getenv("ILLUSTRATION_QUOTA", "2MiB")
)

self.zimfarm_task_memory = humanfriendly.parse_size(
os.getenv("ZIMFARM_TASK_MEMORY") or "1000MiB"
)
self.zimfarm_task_disk = humanfriendly.parse_size(
os.getenv("ZIMFARM_TASK_DISK") or "200MiB"
)

if not self.zimfarm_callback_base_url:
self.zimfarm_callback_base_url = f"{self.zimfarm_api_url}/requests/hook"


constants = BackendConf()
logger = constants.logger
1 change: 1 addition & 0 deletions backend/api/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class Archive(Base):
filesize: Mapped[int | None]
created_on: Mapped[datetime]
requested_on: Mapped[datetime | None]
completed_on: Mapped[datetime | None]
download_url: Mapped[str | None]
collection_json_path: Mapped[str | None]
status: Mapped[str]
Expand Down
28 changes: 28 additions & 0 deletions backend/api/database/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from uuid import UUID

from sqlalchemy import select

from api.database import Session as DBSession
from api.database.models import File, Project


def get_file_by_id(file_id: UUID) -> File:
"""Get File instance by its id."""
with DBSession.begin() as session:
stmt = select(File).where(File.id == file_id)
file = session.execute(stmt).scalar()
if not file:
raise ValueError(f"File not found: {file_id}")
session.expunge(file)
return file


def get_project_by_id(project_id: UUID) -> Project:
"""Get Project instance by its id."""
with DBSession.begin() as session:
stmt = select(Project).where(Project.id == project_id)
project = session.execute(stmt).scalar()
if not project:
raise ValueError(f"Project not found: {project_id}")
session.expunge(project)
return project
75 changes: 75 additions & 0 deletions backend/api/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from collections.abc import Iterable
from pathlib import Path
from typing import Any

import humanfriendly
import requests
from jinja2 import Environment, FileSystemLoader, select_autoescape
from werkzeug.datastructures import MultiDict

from api.constants import constants, logger
from api.database.models import Archive

jinja_env = Environment(
loader=FileSystemLoader("templates"),
autoescape=select_autoescape(["html", "txt"]),
)
jinja_env.filters["short_id"] = lambda value: str(value)[:5]
jinja_env.filters["format_size"] = lambda value: humanfriendly.format_size(
value, binary=True
)


def send_email_via_mailgun(
to: Iterable[str] | str,
subject: str,
contents: str,
cc: Iterable[str] | None = None,
bcc: Iterable[str] | None = None,
attachments: Iterable[Path] | None = None,
) -> str:
if not constants.mailgun_api_url or not constants.mailgun_api_key:
logger.warn(f"Mailgun not configured, ignoring email request to: {to!s}")
return ""

values = [
("from", constants.mailgun_from),
("subject", subject),
("html", contents),
]

values += [("to", list(to) if isinstance(to, Iterable) else [to])]
values += [("cc", list(cc) if isinstance(cc, Iterable) else [cc])]
values += [("bcc", list(bcc) if isinstance(bcc, Iterable) else [bcc])]
data = MultiDict(values)

try:
resp = requests.post(
url=f"{constants.mailgun_api_url}/messages",
auth=("api", constants.mailgun_api_key),
data=data,
files=(
[
("attachment", (fpath.name, open(fpath, "rb").read()))
for fpath in attachments
]
if attachments
else []
),
timeout=constants.mailgun_request_timeout_sec,
)
resp.raise_for_status()
except Exception as exc:
logger.error(f"Failed to send mailgun notif: {exc}")
logger.exception(exc)
return resp.json().get("id") or resp.text


def get_context(task: dict[str, Any], archive: Archive):
"""Jinja context dict for email notifications"""
return {
"base_url": constants.public_url,
"download_url": constants.download_url,
"task": task,
"archive": archive,
}
67 changes: 67 additions & 0 deletions backend/api/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import hashlib
from collections.abc import Iterator
from pathlib import Path
from typing import BinaryIO
from uuid import UUID

from api.constants import constants
from api.database import get_local_fpath_for


def calculate_file_size(file: BinaryIO) -> int:
"""Calculate the size of a file chunk by chunk"""
size = 0
for chunk in read_file_in_chunks(file):
size += len(chunk)
return size


def read_file_in_chunks(
reader: BinaryIO, chunk_size=constants.chunk_size
) -> Iterator[bytes]:
"""Read Big file chunk by chunk. Default chunk size is 2k"""
while True:
chunk = reader.read(chunk_size)
if not chunk:
break
yield chunk
reader.seek(0)


def save_file(file: BinaryIO, file_name: str, project_id: UUID) -> Path:
"""Saves a binary file to a specific location and returns its path."""
fpath = get_local_fpath_for(file_name, project_id)
if not fpath.is_file():
with open(fpath, "wb") as file_object:
for chunk in read_file_in_chunks(file):
file_object.write(chunk)
return fpath


def generate_file_hash(file: BinaryIO) -> str:
"""Generate sha256 hash of a file, optimized for large files"""
hasher = hashlib.sha256()
for chunk in read_file_in_chunks(file):
hasher.update(chunk)
return hasher.hexdigest()


def normalize_filename(filename: str) -> str:
"""filesystem (ext4,apfs,hfs+,ntfs,exfat) and S3 compliant filename"""

normalized = str(filename)

# we replace / with __ as it would have a meaning
replacements = (("/", "__"),)
for pattern, repl in replacements:
normalized = filename.replace(pattern, repl)

# other prohibited chars are removed (mostly for Windows context)
removals = ["\\", ":", "*", "?", '"', "<", ">", "|"] + [
chr(idx) for idx in range(1, 32)
]
for char in removals:
normalized.replace(char, "")

# ext4/exfat has a 255B filename limit (s3 is 1KiB)
return normalized.encode("utf-8")[:255].decode("utf-8")
Loading

0 comments on commit 641a787

Please sign in to comment.