From 08381729e338c9e325bd9e62bf1bcd5262e5215c Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 25 Mar 2022 01:01:09 -0600 Subject: [PATCH 1/7] Remove re-assignment of self.clock in UploadResource Signed-off-by: Sumner Evans --- synapse/rest/media/upload_resource.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 697348613b52..d5e419605f00 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -42,7 +42,6 @@ def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.media.max_upload_size - self.clock = hs.get_clock() async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: respond_with_json(request, 200, {}, send_cors=True) From 50a0f9d0b05fdd1c1a51b0e023389d5f27c95508 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 15 Apr 2022 09:01:08 -0600 Subject: [PATCH 2/7] config: add option to rate limit media creation Signed-off-by: Sumner Evans --- docs/usage/configuration/config_documentation.md | 13 +++++++++++++ synapse/config/ratelimiting.py | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 1b6f2569490e..32a3c58996ed 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1664,6 +1664,19 @@ rc_third_party_invite: burst_count: 10 ``` --- +### `rc_media_create` + +This option ratelimits creation of MXC URIs via the MSC2246 asynchronous upload +`/_matrix/media/v1/create` endpoint based on the account that's creating the +media. Defaults to `per_second: 10`, `burst_count: 50`. + +Example configuration: +```yaml +rc_media_create: + per_second: 10 + burst_count: 50 +``` +--- ### `rc_federation` Defines limits on federation requests. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index a5514e70a21d..577d7d41cdd7 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -160,3 +160,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: config.get("rc_third_party_invite", {}), defaults={"per_second": 0.0025, "burst_count": 5}, ) + + # Ratelimit create media requests: + self.rc_media_create = RatelimitSettings( + config.get("rc_media_create", {}), + defaults={"per_second": 10, "burst_count": 50}, + ) From 68272cbbcc7cdd4eb8f66397f90a55001d4e4cc5 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 31 May 2022 09:19:57 -0600 Subject: [PATCH 3/7] config: add option for how long to wait until media ID expires --- docs/usage/configuration/config_documentation.md | 10 ++++++++++ synapse/config/repository.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 32a3c58996ed..baae67ec1ad0 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1738,6 +1738,16 @@ Example configuration: media_store_path: "DATADIR/media_store" ``` --- +### `unused_expiration_time` + +How long to wait in milliseconds before expiring created media IDs when MSC2246 +support is enabled. Defaults to "24h" + +Example configuration: +```yaml +unused_expiration_time: "1h" +``` +--- ### `media_storage_providers` Media storage providers allow media to be stored in different diff --git a/synapse/config/repository.py b/synapse/config/repository.py index ecb3edbe3ade..306eb30a28fd 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -137,6 +137,10 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M")) self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M")) + self.unused_expiration_time = self.parse_duration( + config.get("unused_expiration_time", "24h") + ) + self.media_store_path = self.ensure_directory( config.get("media_store_path", "media_store") ) From 9badcae8f1e4e821d8e701e7a530254b96453a0f Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 20 Apr 2023 01:45:32 -0600 Subject: [PATCH 4/7] media/create: add /create endpoint See matrix-org/matrix-spec-proposals#2246 for details Signed-off-by: Sumner Evans --- synapse/media/media_repository.py | 20 +++++ synapse/rest/media/create_resource.py | 82 +++++++++++++++++++ .../rest/media/media_repository_resource.py | 2 + .../databases/main/media_repository.py | 18 ++++ .../74/05_add_unused_expires_at_for_media.sql | 20 +++++ 5 files changed, 142 insertions(+) create mode 100644 synapse/rest/media/create_resource.py create mode 100644 synapse/storage/schema/main/delta/74/05_add_unused_expires_at_for_media.sql diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index b81e3c2b0c60..55ac0c82bc68 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -76,6 +76,7 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.max_upload_size = hs.config.media.max_upload_size self.max_image_pixels = hs.config.media.max_image_pixels + self.unused_expiration_time = hs.config.media.unused_expiration_time Thumbnailer.set_limits(self.max_image_pixels) @@ -173,6 +174,25 @@ def mark_recently_accessed(self, server_name: Optional[str], media_id: str) -> N else: self.recently_accessed_locals.add(media_id) + async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]: + """Create and store a media ID for a local user and return the mxc URL + Args: + auth_user: The user_id of the uploader + Returns: + The mxc url of the stored content + """ + media_id = random_string(24) + now = self.clock.time_msec() + # After the configured amount of time, don't allow the upload to start. + unused_expires_at = now + self.unused_expiration_time + await self.store.store_local_media_id( + media_id=media_id, + time_now_ms=now, + user_id=auth_user, + unused_expires_at=unused_expires_at, + ) + return f"mxc://{self.server_name}/{media_id}", unused_expires_at + async def create_content( self, media_type: str, diff --git a/synapse/rest/media/create_resource.py b/synapse/rest/media/create_resource.py new file mode 100644 index 000000000000..a6a2120c5c8a --- /dev/null +++ b/synapse/rest/media/create_resource.py @@ -0,0 +1,82 @@ +# Copyright 2023 Beeper Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.api.errors import LimitExceededError +from synapse.api.ratelimiting import Ratelimiter +from synapse.http.server import DirectServeJsonResource, respond_with_json +from synapse.http.site import SynapseRequest + +if TYPE_CHECKING: + from synapse.media.media_repository import MediaRepository + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class CreateResource(DirectServeJsonResource): + isLeaf = True + + def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): + super().__init__() + + self.media_repo = media_repo + self.clock = hs.get_clock() + self.auth = hs.get_auth() + + # A rate limiter for creating new media IDs. + self._create_media_rate_limiter = Ratelimiter( + store=hs.get_datastores().main, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_media_create.per_second, + burst_count=hs.config.ratelimiting.rc_media_create.burst_count, + ) + + async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: + respond_with_json(request, 200, {}, send_cors=True) + + async def _async_render_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + + # If the create media requests for the user are over the limit, drop + # them. + allowed, time_allowed = await self._create_media_rate_limiter.can_do_action( + requester + ) + if not allowed: + time_now_s = self.clock.time() + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now_s)) + ) + + content_uri, unused_expires_at = await self.media_repo.create_media_id( + requester.user + ) + + logger.info( + "Created Media URI %r that if unused will expire at %d", + content_uri, + unused_expires_at, + ) + respond_with_json( + request, + 200, + { + "content_uri": content_uri, + "unused_expires_at": unused_expires_at, + }, + send_cors=True, + ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 5ebaa3b032cd..46203ee50f57 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -18,6 +18,7 @@ from synapse.http.server import UnrecognizedRequestResource from .config_resource import MediaConfigResource +from .create_resource import CreateResource from .download_resource import DownloadResource from .preview_url_resource import PreviewUrlResource from .thumbnail_resource import ThumbnailResource @@ -80,6 +81,7 @@ def __init__(self, hs: "HomeServer"): super().__init__() media_repo = hs.get_media_repository() + self.putChild(b"create", CreateResource(hs, media_repo)) self.putChild(b"upload", UploadResource(hs, media_repo)) self.putChild(b"download", DownloadResource(hs, media_repo)) self.putChild( diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index fa8be214ce90..1c8c69652599 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -328,6 +328,24 @@ def _get_local_media_ids_txn(txn: LoggingTransaction) -> List[str]: "get_local_media_ids", _get_local_media_ids_txn ) + async def store_local_media_id( + self, + media_id: str, + time_now_ms: int, + user_id: UserID, + unused_expires_at: int, + ) -> None: + await self.db_pool.simple_insert( + "local_media_repository", + { + "media_id": media_id, + "created_ts": time_now_ms, + "user_id": user_id.to_string(), + "unused_expires_at": unused_expires_at, + }, + desc="store_local_media_id", + ) + async def store_local_media( self, media_id: str, diff --git a/synapse/storage/schema/main/delta/74/05_add_unused_expires_at_for_media.sql b/synapse/storage/schema/main/delta/74/05_add_unused_expires_at_for_media.sql new file mode 100644 index 000000000000..2adc6071721c --- /dev/null +++ b/synapse/storage/schema/main/delta/74/05_add_unused_expires_at_for_media.sql @@ -0,0 +1,20 @@ +/* Copyright 2023 Beeper Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add new colums to the `local_media_repository` to keep track of when the +-- media ID must be used by. This is to support async uploads (see MSC2246). + +ALTER TABLE local_media_repository + ADD COLUMN unused_expires_at BIGINT DEFAULT NULL; From 0f7ddcf783ff0412c31f5ab0fd8f1ca7a49c131a Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 15 Apr 2022 19:24:54 -0600 Subject: [PATCH 5/7] changelog: add entry for async uploads Signed-off-by: Sumner Evans --- changelog.d/15460.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15460.feature diff --git a/changelog.d/15460.feature b/changelog.d/15460.feature new file mode 100644 index 000000000000..b6ca97a2cf50 --- /dev/null +++ b/changelog.d/15460.feature @@ -0,0 +1 @@ +Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper. From cfaeac8b15ffe69fae3c9a25a4a80c2b58746f41 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 20 Apr 2023 02:14:54 -0600 Subject: [PATCH 6/7] media/upload: add support for async uploads Signed-off-by: Sumner Evans --- synapse/media/media_repository.py | 65 +++++++++++++++++++ synapse/rest/media/upload_resource.py | 65 +++++++++++++++++-- .../databases/main/media_repository.py | 25 +++++++ 3 files changed, 148 insertions(+), 7 deletions(-) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 55ac0c82bc68..86ef3b5ec297 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -26,6 +26,7 @@ from twisted.internet.defer import Deferred from synapse.api.errors import ( + Codes, FederationDeniedError, HttpResponseException, NotFoundError, @@ -193,6 +194,70 @@ async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]: ) return f"mxc://{self.server_name}/{media_id}", unused_expires_at + async def verify_can_upload(self, media_id: str, auth_user: UserID) -> None: + """Verify that the media ID can be uploaded to by the given user. This + function checks that: + * the media ID exists + * the media ID does not already have content + * the user uploading is the same as the one who created the media ID + * the media ID has not expired + Args: + media_id: The media ID to verify + auth_user: The user_id of the uploader + """ + media = await self.store.get_local_media(media_id) + if media is None: + raise SynapseError(404, "Unknow media ID", errcode=Codes.NOT_FOUND) + + if media["user_id"] != str(auth_user): + raise SynapseError( + 403, + "Only the creator of the media ID can upload to it", + errcode=Codes.FORBIDDEN, + ) + + if media.get("media_length") is not None: + raise SynapseError( + 409, + "Media ID already has content", + errcode="M_CANNOT_OVERWRITE_MEDIA", + ) + + if media.get("unused_expires_at", 0) < self.clock.time_msec(): + raise NotFoundError("Media ID has expired") + + async def update_content( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + content: IO, + content_length: int, + auth_user: UserID, + ) -> None: + """Update the content of the given media ID. + Args: + media_id: The media ID to replace. + media_type: The content type of the file. + upload_name: The name of the file, if provided. + content: A file like object that is the content to store + content_length: The length of the content + auth_user: The user_id of the uploader + """ + file_info = FileInfo(server_name=None, file_id=media_id) + fname = await self.media_storage.store_file(content, file_info) + logger.info("Stored local media in file %r", fname) + + await self.store.update_local_media( + media_id=media_id, + media_type=media_type, + upload_name=upload_name, + media_length=content_length, + user_id=auth_user, + ) + + await self._generate_thumbnails(None, media_id, media_id, media_type) + async def create_content( self, media_type: str, diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index d5e419605f00..14cc558646f5 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -14,12 +14,13 @@ # limitations under the License. import logging -from typing import IO, TYPE_CHECKING, Dict, List, Optional +from typing import IO, TYPE_CHECKING, Dict, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.servlet import parse_bytes_from_args from synapse.http.site import SynapseRequest +from synapse.media._base import parse_media_id from synapse.media.media_storage import SpamMediaException if TYPE_CHECKING: @@ -28,6 +29,9 @@ logger = logging.getLogger(__name__) +# The name of the lock to use when uploading media. +_UPLOAD_MEDIA_LOCK_NAME = "upload_media" + class UploadResource(DirectServeJsonResource): isLeaf = True @@ -38,16 +42,13 @@ def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): self.media_repo = media_repo self.filepaths = media_repo.filepaths self.store = hs.get_datastores().main - self.clock = hs.get_clock() self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.media.max_upload_size - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_POST(self, request: SynapseRequest) -> None: - requester = await self.auth.get_user_by_req(request) + def _get_file_metadata( + self, request: SynapseRequest + ) -> Tuple[int, Optional[str], str]: raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: raise SynapseError(msg="Request must specify a Content-Length", code=400) @@ -90,6 +91,15 @@ async def _async_render_POST(self, request: SynapseRequest) -> None: # disposition = headers.getRawHeaders(b"Content-Disposition")[0] # TODO(markjh): parse content-dispostion + return content_length, upload_name, media_type + + async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: + respond_with_json(request, 200, {}, send_cors=True) + + async def _async_render_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + content_length, upload_name, media_type = self._get_file_metadata(request) + try: content: IO = request.content # type: ignore content_uri = await self.media_repo.create_content( @@ -105,3 +115,44 @@ async def _async_render_POST(self, request: SynapseRequest) -> None: respond_with_json( request, 200, {"content_uri": str(content_uri)}, send_cors=True ) + + async def _async_render_PUT(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + server_name, media_id, _ = parse_media_id(request) + + if server_name != self.server_name: + raise SynapseError( + 404, + "Non-local server name specified", + errcode=Codes.NOT_FOUND, + ) + + lock = await self.store.try_acquire_lock(_UPLOAD_MEDIA_LOCK_NAME, media_id) + if not lock: + raise SynapseError( + 409, + "Media ID is is locked and cannot be uploaded to", + errcode="M_CANNOT_OVERWRITE_MEDIA", + ) + + async with lock: + await self.media_repo.verify_can_upload(media_id, requester.user) + content_length, upload_name, media_type = self._get_file_metadata(request) + + try: + content: IO = request.content # type: ignore + await self.media_repo.update_content( + media_id, + media_type, + upload_name, + content, + content_length, + requester.user, + ) + except SpamMediaException: + # For uploading of media we want to respond with a 400, instead of + # the default 404, as that would just be confusing. + raise SynapseError(400, "Bad content") + + logger.info("Uploaded content to URI %r", media_id) + respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 1c8c69652599..56b639b4890f 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -166,6 +166,7 @@ async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]: "quarantined_by", "url_cache", "safe_from_quarantine", + "user_id", ), allow_none=True, desc="get_local_media", @@ -370,6 +371,30 @@ async def store_local_media( desc="store_local_media", ) + async def update_local_media( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + media_length: int, + user_id: UserID, + url_cache: Optional[str] = None, + ) -> None: + await self.db_pool.simple_update_one( + "local_media_repository", + keyvalues={ + "user_id": user_id.to_string(), + "media_id": media_id, + }, + updatevalues={ + "media_type": media_type, + "upload_name": upload_name, + "media_length": media_length, + "url_cache": url_cache, + }, + desc="update_local_media", + ) + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( From 298355cf82cd29f360ec63741cf57e32cc6a5742 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 28 Apr 2023 08:07:30 -0600 Subject: [PATCH 7/7] media/{download,thumbnail}: support timeout_ms parameter Signed-off-by: Sumner Evans --- synapse/api/errors.py | 1 + synapse/media/_base.py | 6 ++ synapse/media/media_repository.py | 98 ++++++++++++++++++++---- synapse/rest/media/download_resource.py | 27 +++++-- synapse/rest/media/thumbnail_resource.py | 65 +++++++++++----- 5 files changed, 154 insertions(+), 43 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index f2d6f9ab2d9e..6961a013ad16 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -80,6 +80,7 @@ class Codes(str, Enum): WEAK_PASSWORD = "M_WEAK_PASSWORD" INVALID_SIGNATURE = "M_INVALID_SIGNATURE" USER_DEACTIVATED = "M_USER_DEACTIVATED" + NOT_YET_UPLOADED = "M_NOT_YET_UPLOADED" # Part of MSC3848 # https://github.com/matrix-org/matrix-spec-proposals/pull/3848 diff --git a/synapse/media/_base.py b/synapse/media/_base.py index ef8334ae2586..93c93998dcc6 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -50,6 +50,12 @@ "text/xml", ] +# Default timeout_ms for download and thumbnail requests +DEFAULT_MAX_TIMEOUT_MS = 20_000 + +# Maximum allowed timeout_ms for download and thumbnail requests +MAXIMUM_ALLOWED_MAX_TIMEOUT_MS = 60_000 + def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: """Parses the server name, media ID and optional file name from the request URI diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 86ef3b5ec297..1c35b3de48d3 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -17,7 +17,7 @@ import os import shutil from io import BytesIO -from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from matrix_common.types.mxc_uri import MXCUri @@ -32,8 +32,10 @@ NotFoundError, RequestSendFailed, SynapseError, + cs_error, ) from synapse.config.repository import ThumbnailRequirement +from synapse.http.server import respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread from synapse.media._base import ( @@ -300,8 +302,62 @@ async def create_content( return MXCUri(self.server_name, media_id) + def respond_not_yet_uploaded(self, request: SynapseRequest) -> None: + respond_with_json( + request, + 404, + cs_error("Media has not been uploaded yet", code=Codes.NOT_YET_UPLOADED), + send_cors=True, + ) + + async def get_local_media_info( + self, request: SynapseRequest, media_id: str, max_timeout_ms: int + ) -> Optional[Dict[str, Any]]: + """Gets the info dictionary for given local media ID. If the media has + not been uploaded yet, this function will wait up to ``max_timeout_ms`` + milliseconds for the media to be uploaded. + Args: + request: The incoming request. + media_id: The media ID of the content. (This is the same as + the file_id for local content.) + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. + Returns: + Either the info dictionary for the given local media ID or + ``None``. If ``None``, then no further processing is necessary as + this function will send the necessary JSON response. + """ + wait_until = self.clock.time_msec() + max_timeout_ms + while True: + # Get the info for the media + media_info = await self.store.get_local_media(media_id) + if not media_info: + respond_404(request) + return None + + if media_info["quarantined_by"]: + logger.info("Media is quarantined") + respond_404(request) + return None + + # The file has been uploaded, so stop looping + if media_info.get("media_length") is not None: + return media_info + + if self.clock.time_msec() >= wait_until: + break + + await self.clock.sleep(0.5) + + self.respond_not_yet_uploaded(request) + return None + async def get_local_media( - self, request: SynapseRequest, media_id: str, name: Optional[str] + self, + request: SynapseRequest, + media_id: str, + name: Optional[str], + max_timeout_ms: int, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -311,13 +367,14 @@ async def get_local_media( the file_id for local content.) name: Optional name that, if specified, will be used as the filename in the Content-Disposition header of the response. + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: Resolves once a response has successfully been written to request """ - media_info = await self.store.get_local_media(media_id) - if not media_info or media_info["quarantined_by"]: - respond_404(request) + media_info = await self.get_local_media_info(request, media_id, max_timeout_ms) + if not media_info: return self.mark_recently_accessed(None, media_id) @@ -342,6 +399,7 @@ async def get_remote_media( server_name: str, media_id: str, name: Optional[str], + max_timeout_ms: int, ) -> None: """Respond to requests for remote media. @@ -351,6 +409,8 @@ async def get_remote_media( media_id: The media ID of the content (as defined by the remote server). name: Optional name that, if specified, will be used as the filename in the Content-Disposition header of the response. + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: Resolves once a response has successfully been written to request @@ -368,11 +428,11 @@ async def get_remote_media( key = (server_name, media_id) async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( - server_name, media_id + server_name, media_id, max_timeout_ms ) # We deliberately stream the file outside the lock - if responder: + if responder and media_info: media_type = media_info["media_type"] media_length = media_info["media_length"] upload_name = name if name else media_info["upload_name"] @@ -380,15 +440,19 @@ async def get_remote_media( request, responder, media_type, media_length, upload_name ) else: - respond_404(request) + self.respond_not_yet_uploaded(request) - async def get_remote_media_info(self, server_name: str, media_id: str) -> dict: + async def get_remote_media_info( + self, server_name: str, media_id: str, max_timeout_ms: int + ) -> dict: """Gets the media info associated with the remote file, downloading if necessary. Args: server_name: Remote server_name where the media originated. media_id: The media ID of the content (as defined by the remote server). + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: The media info of the file @@ -404,7 +468,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict: key = (server_name, media_id) async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( - server_name, media_id + server_name, media_id, max_timeout_ms ) # Ensure we actually use the responder so that it releases resources @@ -415,7 +479,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict: return media_info async def _get_remote_media_impl( - self, server_name: str, media_id: str + self, server_name: str, media_id: str, max_timeout_ms: int ) -> Tuple[Optional[Responder], dict]: """Looks for media in local cache, if not there then attempt to download from remote server. @@ -424,6 +488,8 @@ async def _get_remote_media_impl( server_name: Remote server_name where the media originated. media_id: The media ID of the content (as defined by the remote server). + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: A tuple of responder and the media info of the file. @@ -454,8 +520,7 @@ async def _get_remote_media_impl( try: media_info = await self._download_remote_file( - server_name, - media_id, + server_name, media_id, max_timeout_ms ) except SynapseError: raise @@ -488,6 +553,7 @@ async def _download_remote_file( self, server_name: str, media_id: str, + max_timeout_ms: int, ) -> dict: """Attempt to download the remote file from the given server name, using the given file_id as the local id. @@ -497,7 +563,8 @@ async def _download_remote_file( media_id: The media ID of the content (as defined by the remote server). This is different than the file_id, which is locally generated. - file_id: Local file ID + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: The media info of the file. @@ -521,7 +588,8 @@ async def _download_remote_file( # tell the remote server to 404 if it doesn't # recognise the server_name, to make sure we don't # end up with a routing loop. - "allow_remote": "false" + "allow_remote": "false", + "timeout_ms": str(max_timeout_ms), }, ) except RequestSendFailed as e: diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 8f270cf4ccb8..f9fc61aafeda 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -20,9 +20,14 @@ set_corp_headers, set_cors_headers, ) -from synapse.http.servlet import parse_boolean +from synapse.http.servlet import parse_boolean, parse_integer from synapse.http.site import SynapseRequest -from synapse.media._base import parse_media_id, respond_404 +from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, + parse_media_id, + respond_404, +) if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -54,13 +59,17 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: ) # Limited non-standard form of CSP for IE11 request.setHeader(b"X-Content-Security-Policy", b"sandbox;") - request.setHeader( - b"Referrer-Policy", - b"no-referrer", - ) + request.setHeader(b"Referrer-Policy", b"no-referrer") server_name, media_id, name = parse_media_id(request) + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + if server_name == self.server_name: - await self.media_repo.get_local_media(request, media_id, name) + await self.media_repo.get_local_media( + request, media_id, name, max_timeout_ms + ) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -72,4 +81,6 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: respond_404(request) return - await self.media_repo.get_remote_media(request, server_name, media_id, name) + await self.media_repo.get_remote_media( + request, server_name, media_id, name, max_timeout_ms + ) diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index 4ee2a0dbda79..900f6714b3f6 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -28,6 +28,8 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, FileInfo, ThumbnailInfo, parse_media_id, @@ -70,26 +72,37 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: method = parse_string(request, "method", "scale") # TODO Parse the Accept header to get an prioritised list of thumbnail types. m_type = "image/png" + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) if server_name == self.server_name: if self.dynamic_thumbnails: await self._select_or_generate_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) else: await self._respond_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) self.media_repo.mark_recently_accessed(None, media_id) else: - if self.dynamic_thumbnails: - await self._select_or_generate_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) - else: - await self._respond_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) + remote_resp_function = ( + self._select_or_generate_remote_thumbnail + if self.dynamic_thumbnails + else self._respond_remote_thumbnail + ) + await remote_resp_function( + request, + server_name, + media_id, + width, + height, + method, + m_type, + max_timeout_ms, + ) self.media_repo.mark_recently_accessed(server_name, media_id) async def _respond_local_thumbnail( @@ -100,15 +113,12 @@ async def _respond_local_thumbnail( height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) - + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) if not media_info: - respond_404(request) - return - if media_info["quarantined_by"]: - logger.info("Media is quarantined") - respond_404(request) return thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) @@ -133,8 +143,13 @@ async def _select_or_generate_local_thumbnail( desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) + if not media_info: + return if not media_info: respond_404(request) @@ -199,8 +214,13 @@ async def _select_or_generate_remote_thumbnail( desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id @@ -262,11 +282,16 @@ async def _respond_remote_thumbnail( height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: # TODO: Don't download the whole remote file # We should proxy the thumbnail from the remote server instead of # downloading the remote file and generating our own thumbnails. - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id