Skip to content

Commit

Permalink
Merge pull request #98 from openzim/webdav_storage
Browse files Browse the repository at this point in the history
WebDAV storage
  • Loading branch information
rgaudin authored Oct 4, 2024
2 parents e92971b + 84f3ded commit 2e85fbd
Show file tree
Hide file tree
Showing 27 changed files with 1,231 additions and 196 deletions.
29 changes: 22 additions & 7 deletions backend/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile
import uuid
from dataclasses import dataclass, field
from enum import StrEnum
from pathlib import Path
from uuid import UUID

Expand All @@ -19,6 +20,11 @@ def determine_mandatory_environment_variables():
raise OSError(f"Please set the {variable} environment variable")


class StorageType(StrEnum):
WEBDAV = "webdav"
S3 = "s3"


@dataclass(kw_only=True)
class BackendConf:
"""
Expand All @@ -40,7 +46,7 @@ class BackendConf:

# Scheduler process
redis_uri: str = os.getenv("REDIS_URI") or "redis://localhost:6379/0"
channel_name: str = os.getenv("CHANNEL_NAME") or "s3_upload"
channel_name: str = os.getenv("CHANNEL_NAME") or "storage_upload"

# Transient (on host disk) Storage
transient_storage_path: Path = Path()
Expand All @@ -52,24 +58,27 @@ class BackendConf:
humanfriendly.parse_timespan(os.getenv("S3_RETRY_TIMES") or "10s")
)
s3_deletion_delay: datetime.timedelta = datetime.timedelta(
hours=int(os.getenv("S3_REMOVE_DELETEDUPLOADING_AFTER_HOURS", "12"))
hours=int(os.getenv("S3_REMOVE_DELETEDUPLOADING_AFTER_HOURS", "25"))
)
private_salt = os.getenv(
"PRIVATE_SALT", uuid.uuid4().hex
) # used to make S3 keys unguessable

# WebDAV Storage
webdav_url_with_credentials: str = os.getenv("WEBDAV_URL_WITH_CREDENTIALS") or ""
# webdav4 uses a single timeout mechanism for all requests,
# including the upload (PUT) ones which can be large
webdav_request_timeout_sec: int = int(
os.getenv("WEBDAV_REQUEST_TIMEOUT_SEC") or "900"
)

# Cookies
cookie_domain = os.getenv("COOKIE_DOMAIN", None)
cookie_expiration_days = int(os.getenv("COOKIE_EXPIRATION_DAYS", "30"))
authentication_cookie_name: str = "user_id"

# Deployment
public_url: str = os.getenv("PUBLIC_URL") or "http://localhost"
# /!\ this must match the region/bucket on s3 credentials
download_url: str = (
os.getenv("DOWNLOAD_URL")
or "https://s3.eu-west-2.wasabisys.com/org-kiwix-nautilus"
)
allowed_origins = os.getenv(
"ALLOWED_ORIGINS",
"http://localhost",
Expand Down Expand Up @@ -138,6 +147,12 @@ def __post_init__(self):
def single_user(self) -> UUID:
return UUID(self.single_user_id)

@property
def storage_type(self) -> StorageType:
if self.webdav_url_with_credentials:
return StorageType.WEBDAV
return StorageType.S3


constants = BackendConf()
logger = constants.logger
14 changes: 11 additions & 3 deletions backend/api/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ class User(Base):
created_on: Mapped[datetime]

projects: Mapped[list["Project"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
back_populates="user",
cascade="all, delete-orphan",
order_by="desc(Project.created_on)",
)


Expand All @@ -165,8 +167,14 @@ class Project(Base):

user: Mapped[User] = relationship(back_populates="projects", init=False)

files: Mapped[list["File"]] = relationship(cascade="all, delete-orphan")
archives: Mapped[list["Archive"]] = relationship(cascade="all, delete-orphan")
webdav_path: Mapped[str | None]

files: Mapped[list["File"]] = relationship(
cascade="all, delete-orphan", order_by="File.id"
)
archives: Mapped[list["Archive"]] = relationship(
cascade="all, delete-orphan", order_by="desc(Archive.created_on)"
)

@property
def used_space(self):
Expand Down
6 changes: 3 additions & 3 deletions backend/api/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import uvicorn

from api import storage
from api.main import create_app # pragma: no cover
from api.s3 import s3_storage

# dont check S3 credentials URL in tests
if "pytest" not in sys.modules:
# raises should S3 credentials URL be malformed
_ = s3_storage.storage
# raises should S3 credentials URL be malformed (attempts to use)
_ = storage.storage
app = create_app() # pragma: no cover


Expand Down
67 changes: 33 additions & 34 deletions backend/api/routes/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
read_file_in_chunks,
)
from api.routes import userless_validated_project, validated_project
from api.s3 import s3_file_key, s3_storage
from api.storage import storage
from api.zimfarm import RequestSchema, WebhookPayload, request_task

router = APIRouter()
Expand Down Expand Up @@ -289,7 +289,7 @@ def gen_collection_for(project: Project) -> tuple[list[dict[str, Any]], BinaryIO
entry["authors"] = ", ".join(file.authors)
entry["files"] = [
{
"url": f"{constants.download_url}/{s3_file_key(project.id, file.hash)}",
"url": f"{storage.public_url}{storage.get_file_path(file=file)}",
"filename": file.filename,
}
]
Expand All @@ -304,22 +304,19 @@ def gen_collection_for(project: Project) -> tuple[list[dict[str, Any]], BinaryIO
return collection, file, digest


def get_file_key(project_id: UUID, file_hash: str, suffix: str) -> str:
# suffix useful to debug live URLs in-browser
return f"{s3_file_key(project_id=project_id, file_hash=file_hash)}{suffix}"


def upload_file_to_s3(project: Project, file: BinaryIO, s3_key: str):
def upload_file_to_storage(project: Project, file: BinaryIO, storage_path: str):

try:
if s3_storage.storage.has_object(s3_key):
logger.debug(f"Object `{s3_key}` already in S3… weird but OK")
# only in single-user mode are users allowed to overwrite
if not constants.single_user_id and storage.has(storage_path):
logger.debug(f"Object `{storage_path}` already in Storage… weird but OK")
return
logger.debug(f"Uploading file to `{s3_key}`")
s3_storage.storage.upload_fileobj(fileobj=file, key=s3_key)
s3_storage.storage.set_object_autodelete_on(s3_key, project.expire_on)
logger.debug(f"Uploading file to `{storage_path}`")
storage.upload_fileobj(fileobj=file, path=storage_path)
logger.debug(f"Setting autodelete to `{project.expire_on}`")
storage.set_autodelete_on(storage_path, project.expire_on)
except Exception as exc:
logger.error(f"File failed to upload to s3 `{s3_key}`: {exc}")
logger.error(f"File failed to upload to Storage `{storage_path}`: {exc}")
raise exc


Expand Down Expand Up @@ -357,39 +354,43 @@ async def request_archive(

# upload illustration
illustration = io.BytesIO(base64.b64decode(archive.config.illustration))
illus_key = get_file_key(
project_id=archive.project_id,
illus_key = storage.get_companion_file_path(
project=project,
file_hash=generate_file_hash(illustration),
suffix=".png",
suffix="illustration.png",
)
illustration.seek(0)
# upload it to S3
upload_file_to_s3(project=project, file=illustration, s3_key=illus_key)
# upload it to Storage
upload_file_to_storage(project=project, file=illustration, storage_path=illus_key)

# upload main-logo
if archive.config.main_logo:
main_logo = io.BytesIO(base64.b64decode(archive.config.main_logo))
main_logo_key = get_file_key(
project_id=archive.project_id,
main_logo_key = storage.get_companion_file_path(
project=project,
file_hash=generate_file_hash(main_logo),
suffix=".png",
suffix="main-logo.png",
)
main_logo.seek(0)
# upload it to S3
upload_file_to_s3(project=project, file=main_logo, s3_key=main_logo_key)
# upload it to Storage
upload_file_to_storage(
project=project, file=main_logo, storage_path=main_logo_key
)

# gen collection and stream
collection, collection_file, collection_hash = gen_collection_for(project=project)
collection_key = get_file_key(
project_id=archive.project_id, file_hash=collection_hash, suffix=".json"
collection_key = storage.get_companion_file_path(
project=project, file_hash=collection_hash, suffix="collection.json"
)

# upload it to S3
upload_file_to_s3(project=project, file=collection_file, s3_key=collection_key)
# upload it to Storage
upload_file_to_storage(
project=project, file=collection_file, storage_path=collection_key
)

# Everything's on S3, prepare and submit a ZF request
# Everything's on Storage, prepare and submit a ZF request
request_def = RequestSchema(
collection_url=f"{constants.download_url}/{collection_key}",
collection_url=f"{storage.public_url}/{collection_key}",
name=archive.config.name,
title=archive.config.title,
description=archive.config.description,
Expand All @@ -399,11 +400,9 @@ async def request_archive(
publisher=archive.config.publisher,
tags=archive.config.tags,
main_logo_url=(
f"{constants.download_url}/{main_logo_key}"
if archive.config.main_logo
else ""
f"{storage.public_url}/{main_logo_key}" if archive.config.main_logo else ""
),
illustration_url=f"{constants.download_url}/{illus_key}",
illustration_url=f"{storage.public_url}/{illus_key}",
)
task_id = request_task(
project_id=project.id,
Expand Down
59 changes: 30 additions & 29 deletions backend/api/routes/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from api.database.utils import get_file_by_id, get_project_by_id
from api.files import calculate_file_size, generate_file_hash, save_file
from api.routes import validated_project
from api.s3 import s3_file_key, s3_storage
from api.storage import storage
from api.store import task_queue

router = APIRouter()
Expand Down Expand Up @@ -48,7 +48,7 @@ class FileModel(BaseModel):

class FileStatus(str, Enum):
LOCAL = "LOCAL"
S3 = "S3"
STORAGE = "STORAGE"
FAILURE = "FAILURE"
PROCESSING = "PROCESSING"

Expand Down Expand Up @@ -130,47 +130,50 @@ def update_file_path(file: File, path: str):
update_file_status_and_path(file, file.status, path)


def upload_file_to_s3(new_file_id: UUID):
"""Update local file to S3 storage and update file status"""
def upload_file_to_storage(new_file_id: UUID):
"""Update local file to Storage and update file status"""
new_file = get_file_by_id(new_file_id)
project = get_project_by_id(new_file.project_id)
if not project.expire_on:
if not constants.single_user_id and not project.expire_on:
raise ValueError(f"Project: {project.id} does not have expire date.")

if new_file.status == FileStatus.PROCESSING:
return
else:
update_file_status(new_file, FileStatus.PROCESSING)

s3_key = s3_file_key(new_file.project_id, new_file.hash)

try:
if s3_storage.storage.has_object(s3_key):
logger.debug(f"Object `{s3_key}` for File {new_file_id} already in S3")
update_file_status_and_path(new_file, FileStatus.S3, s3_key)
storage_path = storage.get_file_path(file=new_file)
if storage.has(storage_path):
logger.debug(
f"Object `{storage_path}` for File {new_file_id} already in Storage"
)
update_file_status_and_path(new_file, FileStatus.STORAGE, storage_path)
return
logger.debug(f"Uploading {new_file_id}: `{new_file.local_fpath}` to `{s3_key}`")
s3_storage.storage.upload_file(fpath=new_file.local_fpath, key=s3_key)
logger.debug(
f"Uploading {new_file_id}: `{new_file.local_fpath}` to `{storage_path}`"
)
storage.upload_file(fpath=new_file.local_fpath, path=storage_path)
logger.debug(f"Uploaded {new_file_id}. Removing `{new_file.local_fpath}`…")
new_file.local_fpath.unlink(missing_ok=True)
s3_storage.storage.set_object_autodelete_on(s3_key, project.expire_on)
update_file_status_and_path(new_file, FileStatus.S3, s3_key)
storage.set_autodelete_on(storage_path, project.expire_on)
update_file_status_and_path(new_file, FileStatus.STORAGE, storage_path)
except Exception as exc:
logger.error(f"File: {new_file_id} failed to upload to s3: {exc}")
logger.error(f"File: {new_file_id} failed to upload to Storage: {exc}")
update_file_status(new_file, FileStatus.FAILURE)
raise exc


def delete_key_from_s3(s3_file_key: str):
"""Delete files from S3."""
logger.warning(f"File: {s3_file_key} starts deletion from S3")
if not s3_storage.storage.has_object(s3_file_key):
logger.debug(f"{s3_file_key} does not exist in S3")
def delete_from_storage(storage_path: str):
"""Delete files from Storage."""
logger.warning(f"File: {storage_path} starts deletion from Storage")
if not storage.has(storage_path):
logger.debug(f"{storage_path} does not exist in Storage")
return
try:
s3_storage.storage.delete_object(s3_file_key)
storage.delete(storage_path)
except Exception as exc:
logger.error(f"File: {s3_file_key} failed to be deleted from S3")
logger.error(f"File: {storage_path} failed to be deleted from Storage")
logger.exception(exc)


Expand Down Expand Up @@ -239,7 +242,7 @@ async def create_file(
indep_session.refresh(new_file)
file_id = str(new_file.id)
# request file upload by rq-worker
task_queue.enqueue(upload_file_to_s3, file_id, retry=constants.job_retry)
task_queue.enqueue(upload_file_to_storage, file_id, retry=constants.job_retry)

# fetch File from DB in this session to return it
file = session.execute(select(File).filter_by(id=file_id)).scalar()
Expand Down Expand Up @@ -295,14 +298,12 @@ async def delete_file(
if number_of_duplicate_files == 1:
if file.status == FileStatus.LOCAL:
file.local_fpath.unlink(missing_ok=True)
if file.status == FileStatus.S3:
task_queue.enqueue(
delete_key_from_s3, s3_file_key(file.project_id, file.hash)
)
if file.status == FileStatus.STORAGE:
task_queue.enqueue(delete_from_storage, storage.get_file_path(file=file))
if file.status == FileStatus.PROCESSING:
task_queue.enqueue_at(
datetime.datetime.now(tz=datetime.UTC) + constants.s3_deletion_delay,
delete_key_from_s3,
s3_file_key(file.project_id, file.hash),
delete_from_storage,
storage.get_file_path(file=file),
)
session.delete(file)
Loading

0 comments on commit 2e85fbd

Please sign in to comment.