Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
media/upload: add support for async uploads
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Apr 20, 2023
1 parent 59c040d commit a4b84b5
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 7 deletions.
65 changes: 65 additions & 0 deletions synapse/media/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from twisted.internet.defer import Deferred

from synapse.api.errors import (
Codes,
FederationDeniedError,
HttpResponseException,
NotFoundError,
Expand Down Expand Up @@ -193,6 +194,70 @@ async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]:
)
return f"mxc://{self.server_name}/{media_id}", unused_expires_at

async def verify_can_upload(self, media_id: str, auth_user: UserID) -> None:
"""Verify that the media ID can be uploaded to by the given user. This
function checks that:
* the media ID exists
* the media ID does not already have content
* the user uploading is the same as the one who created the media ID
* the media ID has not expired
Args:
media_id: The media ID to verify
auth_user: The user_id of the uploader
"""
media = await self.store.get_local_media(media_id)
if media is None:
raise SynapseError(404, "Unknow media ID", errcode=Codes.NOT_FOUND)

if media["user_id"] != str(auth_user):
raise SynapseError(
403,
"Only the creator of the media ID can upload to it",
errcode=Codes.FORBIDDEN,
)

if media.get("media_length") is not None:
raise SynapseError(
409,
"Media ID already has content",
errcode="M_CANNOT_OVERWRITE_MEDIA",
)

if media.get("unused_expires_at", 0) < self.clock.time_msec():
raise NotFoundError("Media ID has expired")

async def update_content(
self,
media_id: str,
media_type: str,
upload_name: Optional[str],
content: IO,
content_length: int,
auth_user: UserID,
) -> None:
"""Update the content of the given media ID.
Args:
media_id: The media ID to replace.
media_type: The content type of the file.
upload_name: The name of the file, if provided.
content: A file like object that is the content to store
content_length: The length of the content
auth_user: The user_id of the uploader
"""
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
logger.info("Stored local media in file %r", fname)

await self.store.update_local_media(
media_id=media_id,
media_type=media_type,
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
)

await self._generate_thumbnails(None, media_id, media_id, media_type)

async def create_content(
self,
media_type: str,
Expand Down
65 changes: 58 additions & 7 deletions synapse/rest/media/upload_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# limitations under the License.

import logging
from typing import IO, TYPE_CHECKING, Dict, List, Optional
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Tuple

from synapse.api.errors import Codes, SynapseError
from synapse.http.server import DirectServeJsonResource, respond_with_json
from synapse.http.servlet import parse_bytes_from_args
from synapse.http.site import SynapseRequest
from synapse.media._base import parse_media_id
from synapse.media.media_storage import SpamMediaException

if TYPE_CHECKING:
Expand All @@ -28,6 +29,9 @@

logger = logging.getLogger(__name__)

# The name of the lock to use when uploading media.
_UPLOAD_MEDIA_LOCK_NAME = "upload_media"


class UploadResource(DirectServeJsonResource):
isLeaf = True
Expand All @@ -38,16 +42,13 @@ def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"):
self.media_repo = media_repo
self.filepaths = media_repo.filepaths
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.server_name = hs.hostname
self.auth = hs.get_auth()
self.max_upload_size = hs.config.media.max_upload_size

async def _async_render_OPTIONS(self, request: SynapseRequest) -> None:
respond_with_json(request, 200, {}, send_cors=True)

async def _async_render_POST(self, request: SynapseRequest) -> None:
requester = await self.auth.get_user_by_req(request)
def _get_file_metadata(
self, request: SynapseRequest
) -> Tuple[int, Optional[str], str]:
raw_content_length = request.getHeader("Content-Length")
if raw_content_length is None:
raise SynapseError(msg="Request must specify a Content-Length", code=400)
Expand Down Expand Up @@ -90,6 +91,15 @@ async def _async_render_POST(self, request: SynapseRequest) -> None:
# disposition = headers.getRawHeaders(b"Content-Disposition")[0]
# TODO(markjh): parse content-dispostion

return content_length, upload_name, media_type

async def _async_render_OPTIONS(self, request: SynapseRequest) -> None:
respond_with_json(request, 200, {}, send_cors=True)

async def _async_render_POST(self, request: SynapseRequest) -> None:
requester = await self.auth.get_user_by_req(request)
content_length, upload_name, media_type = self._get_file_metadata(request)

try:
content: IO = request.content # type: ignore
content_uri = await self.media_repo.create_content(
Expand All @@ -105,3 +115,44 @@ async def _async_render_POST(self, request: SynapseRequest) -> None:
respond_with_json(
request, 200, {"content_uri": str(content_uri)}, send_cors=True
)

async def _async_render_PUT(self, request: SynapseRequest) -> None:
requester = await self.auth.get_user_by_req(request)
server_name, media_id, _ = parse_media_id(request)

if server_name != self.server_name:
raise SynapseError(
404,
"Non-local server name specified",
errcode=Codes.NOT_FOUND,
)

lock = await self.store.try_acquire_lock(_UPLOAD_MEDIA_LOCK_NAME, media_id)
if not lock:
raise SynapseError(
409,
"Media ID is is locked and cannot be uploaded to",
errcode="M_CANNOT_OVERWRITE_MEDIA",
)

async with lock:
await self.media_repo.verify_can_upload(media_id, requester.user)
content_length, upload_name, media_type = self._get_file_metadata(request)

try:
content: IO = request.content # type: ignore
await self.media_repo.update_content(
media_id,
media_type,
upload_name,
content,
content_length,
requester.user,
)
except SpamMediaException:
# For uploading of media we want to respond with a 400, instead of
# the default 404, as that would just be confusing.
raise SynapseError(400, "Bad content")

logger.info("Uploaded content to URI %r", media_id)
respond_with_json(request, 200, {}, send_cors=True)
25 changes: 25 additions & 0 deletions synapse/storage/databases/main/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
"quarantined_by",
"url_cache",
"safe_from_quarantine",
"user_id",
),
allow_none=True,
desc="get_local_media",
Expand Down Expand Up @@ -370,6 +371,30 @@ async def store_local_media(
desc="store_local_media",
)

async def update_local_media(
self,
media_id: str,
media_type: str,
upload_name: Optional[str],
media_length: int,
user_id: UserID,
url_cache: Optional[str] = None,
) -> None:
await self.db_pool.simple_update_one(
"local_media_repository",
keyvalues={
"user_id": user_id.to_string(),
"media_id": media_id,
},
updatevalues={
"media_type": media_type,
"upload_name": upload_name,
"media_length": media_length,
"url_cache": url_cache,
},
desc="update_local_media",
)

async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
"""Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one(
Expand Down

0 comments on commit a4b84b5

Please sign in to comment.