diff --git a/store/app/crud/kclips.py b/store/app/crud/kclips.py new file mode 100644 index 00000000..0369f172 --- /dev/null +++ b/store/app/crud/kclips.py @@ -0,0 +1,107 @@ +"""Defines the CRUD interface for handling user-uploaded KClips.""" + +from typing import TypedDict + +from boto3.session import Session +from botocore.config import Config + +from store.app.crud.base import BaseCrud +from store.app.model import KClip +from store.settings import settings + + +class KClipPart(TypedDict): + """Represents a KClip part in a multipart upload.""" + + part_number: int + url: str + + +class KClipUploadDetails(TypedDict): + upload_id: str + parts: list[KClipPart] + bucket: str + key: str + + +class KClipPartCompleted(TypedDict): + """Represents a completed part in a multipart upload. + + Fields: + PartNumber: The number of this part (1 to 10,000) + ETag: The entity tag returned when the part was uploaded + """ + + part_number: int + etag: str + + +class KClipsCrud(BaseCrud): + def __init__(self) -> None: + super().__init__() + self.s3_client = Session().client("s3", config=Config(signature_version="s3v4")) + + @classmethod + def get_gsis(cls) -> set[str]: + return super().get_gsis().union({"user_id", "robot_id"}) + + async def create_kclip( + self, user_id: str, robot_id: str, name: str, description: str + ) -> tuple[KClip, KClipUploadDetails]: + kclip = KClip.create(user_id=user_id, robot_id=robot_id, name=name, description=description) + + # Initialize multipart upload in S3 + response = self.s3_client.create_multipart_upload(Bucket=settings.s3.bucket, Key=f"kclips/{kclip.id}/{name}") + upload_id = response["UploadId"] + + # Generate presigned URLs for parts (assuming 10MB chunks) + # The actual number of parts will be determined by the CLI + presigned_urls: list[KClipPart] = [] + for part_number in range(1, 10001): # S3 supports up to 10,000 parts + presigned_url = self.s3_client.generate_presigned_url( + "upload_part", + Params={ + "Bucket": settings.s3.bucket, + "Key": f"kclips/{kclip.id}/{name}", + "UploadId": upload_id, + "PartNumber": part_number, + }, + ExpiresIn=3600, # URL expires in 1 hour + ) + presigned_urls.append({"part_number": part_number, "url": presigned_url}) + + # Store the KClip in DynamoDB + await self._add_item(kclip) + + # Return both the KClip model and the upload details + upload_details: KClipUploadDetails = { + "upload_id": upload_id, + "parts": presigned_urls, + "bucket": settings.s3.bucket, + "key": f"kclips/{kclip.id}/{name}", + } + + return kclip, upload_details + + async def complete_upload(self, kclip_id: str, upload_id: str, parts: list[KClipPartCompleted]) -> None: + """Completes a multipart upload for a KClip. + + Args: + kclip_id: The ID of the KClip + upload_id: The upload ID from S3 + parts: List of completed parts with ETag information + """ + kclip = await self._get_item(kclip_id, KClip) + if not kclip: + raise ValueError("KClip not found") + + # Complete the multipart upload + self.s3_client.complete_multipart_upload( + Bucket=settings.s3.bucket, + Key=f"kclips/{kclip_id}/{kclip.name}", + UploadId=upload_id, + MultipartUpload={"Parts": [{"PartNumber": part["part_number"], "ETag": part["etag"]} for part in parts]}, + ) + + # Update KClip status if needed + await self._update_item(kclip_id, KClip, {"upload_status": "completed"}) diff --git a/store/app/db.py b/store/app/db.py index eecfa74b..454646cf 100644 --- a/store/app/db.py +++ b/store/app/db.py @@ -8,6 +8,7 @@ from store.app.crud.artifacts import ArtifactsCrud from store.app.crud.base import TABLE_NAME, BaseCrud from store.app.crud.email import EmailCrud +from store.app.crud.kclips import KClipsCrud from store.app.crud.listings import ListingsCrud from store.app.crud.onshape import OnshapeCrud from store.app.crud.orders import OrdersCrud @@ -23,6 +24,7 @@ class Crud( ListingsCrud, ArtifactsCrud, OrdersCrud, + KClipsCrud, RobotsCrud, TeleopCrud, BaseCrud, diff --git a/store/app/model.py b/store/app/model.py index 1d0e6a64..3320d39f 100644 --- a/store/app/model.py +++ b/store/app/model.py @@ -726,3 +726,37 @@ def create( created_at=now, ttl=ttl_timestamp, ) + + +KClipUploadStatus = Literal["pending", "completed"] + + +class KClip(StoreBaseModel): + """KClip recorded from robot runtime.""" + + user_id: str + robot_id: str + created_at: int + name: str + description: str | None = None + upload_status: KClipUploadStatus = "pending" + + @classmethod + def create( + cls, + user_id: str, + robot_id: str, + name: str, + description: str | None = None, + upload_status: KClipUploadStatus = "pending", + ) -> Self: + now = int(time.time()) + return cls( + id=new_uuid(), + user_id=user_id, + robot_id=robot_id, + created_at=now, + name=name, + description=description, + upload_status=upload_status, + ) diff --git a/store/app/routers/kclips.py b/store/app/routers/kclips.py new file mode 100644 index 00000000..1d158400 --- /dev/null +++ b/store/app/routers/kclips.py @@ -0,0 +1,45 @@ +"""Defines the router endpoints for handling KClips.""" + +from typing import Annotated + +from fastapi import APIRouter, Depends +from pydantic import BaseModel + +from store.app.crud.kclips import KClipPartCompleted, KClipUploadDetails +from store.app.db import Crud +from store.app.model import User +from store.app.security.user import get_session_user_with_write_permission + +router = APIRouter() + + +class NewKClipResponse(BaseModel): + kclip_id: str + upload_details: KClipUploadDetails + + +@router.post("/create") +async def create_kclip( + user: Annotated[User, Depends(get_session_user_with_write_permission)], + robot_id: str, + name: str, + description: str, + crud: Annotated[Crud, Depends(Crud.get)], +) -> NewKClipResponse: + kclip, upload_details = await crud.create_kclip( + user_id=user.id, robot_id=robot_id, name=name, description=description + ) + + return NewKClipResponse(kclip_id=kclip.id, upload_details=upload_details) + + +@router.post("/{kclip_id}/complete") +async def complete_upload( + kclip_id: str, + upload_id: str, + parts: list[KClipPartCompleted], + user: Annotated[User, Depends(get_session_user_with_write_permission)], + crud: Annotated[Crud, Depends(Crud.get)], +) -> dict: + await crud.complete_upload(kclip_id, upload_id, parts) + return {"status": "completed"}