Skip to content

Commit

Permalink
Switch from use of TypedDict to pydantic BaseModel
Browse files Browse the repository at this point in the history
  • Loading branch information
Winston-Hsiao committed Nov 20, 2024
1 parent 8500816 commit bd644a0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 13 deletions.
10 changes: 4 additions & 6 deletions store/app/crud/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
from aiobotocore.response import StreamingBody
from boto3.dynamodb.conditions import Attr, ComparisonCondition, Key
from botocore.exceptions import ClientError
from pydantic import BaseModel
from types_aiobotocore_dynamodb.service_resource import DynamoDBServiceResource
from types_aiobotocore_s3.service_resource import S3ServiceResource
from types_aiobotocore_s3.type_defs import (
CompletedPartTypeDef,
CreateMultipartUploadOutputTypeDef,
)
from typing_extensions import TypedDict

from store.app.errors import InternalError, ItemNotFoundError
from store.app.model import StoreBaseModel
Expand All @@ -52,14 +52,14 @@
DEFAULT_PART_SIZE = 100 * 1024 * 1024 # 100MB default part size


class MultipartUploadPart(TypedDict):
class MultipartUploadPart(BaseModel):
"""Represents a part in a multipart upload."""

PartNumber: int
ETag: str


class MultipartUploadDetails(TypedDict):
class MultipartUploadDetails(BaseModel):
"""Details needed for multipart upload."""

upload_id: str
Expand Down Expand Up @@ -617,9 +617,7 @@ async def _generate_presigned_urls(

async def _complete_multipart_upload(self, key: str, upload_id: str, parts: list[MultipartUploadPart]) -> None:
"""Completes a multipart upload."""
completed_parts: Sequence[CompletedPartTypeDef] = [
{"PartNumber": p["PartNumber"], "ETag": p["ETag"]} for p in parts
]
completed_parts: Sequence[CompletedPartTypeDef] = [{"PartNumber": p.PartNumber, "ETag": p.ETag} for p in parts]

await self.s3.meta.client.complete_multipart_upload(
Bucket=settings.s3.bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": completed_parts}
Expand Down
8 changes: 3 additions & 5 deletions store/app/crud/kclips.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Defines the CRUD interface for handling user-uploaded KClips."""

from typing_extensions import TypedDict
from pydantic import BaseModel

from store.app.crud.base import BaseCrud, MultipartUploadDetails, MultipartUploadPart
from store.app.model import KClip


class KClipPartCompleted(TypedDict):
class KClipPartCompleted(BaseModel):
"""Represents a completed part in a multipart upload."""

part_number: int
Expand All @@ -27,7 +27,6 @@ async def create_kclip(
file_size: int | None = None,
part_size: int | None = None,
) -> tuple[KClip, MultipartUploadDetails]:
"""Creates a KClip and initializes multipart upload."""
kclip = KClip.create(user_id=user_id, robot_id=robot_id, name=name, description=description)

await self._add_item(kclip)
Expand All @@ -40,14 +39,13 @@ async def create_kclip(
return kclip, upload_details

async def complete_upload(self, kclip_id: str, upload_id: str, parts: list[KClipPartCompleted]) -> None:
"""Completes a multipart upload for a KClip."""
kclip = await self._get_item(kclip_id, KClip)
if not kclip:
raise ValueError("KClip not found")

# Convert to S3 expected format
s3_parts: list[MultipartUploadPart] = [
{"PartNumber": part["part_number"], "ETag": part["etag"]} for part in parts
MultipartUploadPart(PartNumber=part.part_number, ETag=part.etag) for part in parts
]

# Complete the multipart upload
Expand Down
4 changes: 2 additions & 2 deletions store/app/routers/kclips.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def create_kclip(
) -> UploadKClipResponse:
robot = await crud.get_robot(kclip_data.robot_id)
if robot is None:
raise ItemNotFoundError(f"Robot with ID {kclip_data.robot_id} not found")
raise ItemNotFoundError("Robot with ID %s not found", kclip_data.robot_id)
if robot.user_id != user.id:
verify_admin_permission(user, "upload KClips for a robot by another user")

Expand Down Expand Up @@ -73,7 +73,7 @@ async def complete_upload(
) -> CompletedKClipUploadResponse:
kclip = await crud.get_kclip(kclip_data.kclip_id)
if kclip is None:
raise ItemNotFoundError(f"KClip with ID {kclip_data.kclip_id} not found")
raise ItemNotFoundError("KClip with ID %s not found", kclip_data.kclip_id)
if kclip.user_id != user.id:
verify_admin_permission(user, "complete upload of KClip by another user")

Expand Down

0 comments on commit bd644a0

Please sign in to comment.