diff --git a/docs/artifact-manager.md b/docs/artifact-manager.md index f842afbe..db9ae6d8 100644 --- a/docs/artifact-manager.md +++ b/docs/artifact-manager.md @@ -579,15 +579,15 @@ Retrieve a list of child artifacts within a specified collection, supporting key - `artifact_id` (str): The id of the parent artifact or collection to list children from. It can be an uuid generated by `create` or `edit` function, or it can be an alias of the artifact under the current workspace. If you want to refer to an artifact in another workspace, you should use the full alias in the format of `"workspace_id/alias"`. If not specified, the function lists all top-level artifacts in the current workspace. - `keywords` (List[str], optional): A list of search terms used for fuzzy searching across all manifest fields. Each term is searched independently, and results matching any term will be included. For example, `["sample", "dataset"]` returns artifacts containing either "sample" or "dataset" in any field of the manifest. -- `filters` (dict, optional): A dictionary where each key is a manifest field name and each value specifies the match for that field. Filters support both exact and range-based matching, depending on the field. You can filter based on the keys inside the manifest, as well as internal fields like permissions and view/download statistics by adding a dot (`.`) before the field name. Supported internal fields include: - - **`.type`**: Matches the artifact type exactly, e.g., `{"type": "application"}`. - - **`.created_by`**: Matches the exact creator ID, e.g., `{"created_by": "user123"}`. - - **`.created_at`** and **`last_modified`**: Accept a single timestamp (lower bound) or a range for filtering. For example, `{"created_at": [1620000000, 1630000000]}` filters artifacts created between the two timestamps. - - **`.view_count`** and **`download_count`**: Accept a single value or a range for filtering, as with date fields. For example, `{"view_count": [10, 100]}` filters artifacts viewed between 10 and 100 times. - - **`.permissions/`**: Searches for artifacts with specific permissions assigned to the given `user_id`. - - **`.stage`**: If `true`, returns only artifacts in staging mode. Example: `{"stage": true}`. By default, .stage filter is always set to `false`. - - **`any other field in manifest`**: Matches the exact value of the field, e.g., `{"name": "example-dataset"}`. These filters also support fuzzy matching if a value contains a wildcard (`*`), e.g., `{"name": "dataset*"}`, depending on the SQL backend. - +- `filters` (dict, optional): A dictionary where each key is a manifest field name and each value specifies the match for that field. Filters support both exact and range-based matching, depending on the field. You can filter based on the keys inside the manifest, as well as internal fields like permissions and view/download statistics. Supported internal fields include: + - **`type`**: Matches the artifact type exactly, e.g., `{"type": "application"}`. + - **`created_by`**: Matches the exact creator ID, e.g., `{"created_by": "user123"}`. + - **`created_at`** and **`last_modified`**: Accept a single timestamp (lower bound) or a range for filtering. For example, `{"created_at": [1620000000, 1630000000]}` filters artifacts created between the two timestamps. + - **`view_count`** and **`download_count`**: Accept a single value or a range for filtering, as with date fields. For example, `{"view_count": [10, 100]}` filters artifacts viewed between 10 and 100 times. + - **`permissions/`**: Searches for artifacts with specific permissions assigned to the given `user_id`. + - **`version`**: Matches the exact version of the artifact, it only support `"stage"`, `"committed"` or `"*"` (both staged or committed). + - **`manifest`**: Matches the exact value of the field, e.g., `"manifest": {"name": "example-dataset"}`. These filters also support fuzzy matching if a value contains a wildcard (`*`), e.g., `"manifest": {"name": "dataset*"}`, depending on the SQL backend. + - **`config`**: Matches the exact value of the field in the config, e.g., `"config": {"collection_schema": {"type": "object"}}`. - `mode` (str, optional): Defines how multiple conditions (from keywords and filters) are combined. Use `"AND"` to ensure all conditions must match, or `"OR"` to include artifacts meeting any condition. Default is `"AND"`. - `offset` (int, optional): The number of artifacts to skip before listing results. Default is `0`. diff --git a/hypha/VERSION b/hypha/VERSION index 4d519499..0c093c3e 100644 --- a/hypha/VERSION +++ b/hypha/VERSION @@ -1,3 +1,3 @@ { - "version": "0.20.39.post17" + "version": "0.20.39.post18" } diff --git a/hypha/artifact.py b/hypha/artifact.py index 539564bb..6d08c817 100644 --- a/hypha/artifact.py +++ b/hypha/artifact.py @@ -34,6 +34,7 @@ from aiobotocore.session import get_session from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import RedirectResponse from hypha.core import ( UserInfo, UserPermission, @@ -64,14 +65,14 @@ class ArtifactModel(SQLModel, table=True): # `table=True` makes it a table mode manifest: Optional[dict] = Field( default=None, sa_column=Column(JSON, nullable=True) ) - staging: Optional[dict] = Field(default=None, sa_column=Column(JSON, nullable=True)) + staging: Optional[list] = Field(default=None, sa_column=Column(JSON, nullable=True)) download_count: float = Field(default=0.0) view_count: float = Field(default=0.0) file_count: int = Field(default=0) created_at: int = Field() created_by: Optional[str] = Field(default=None) last_modified: int = Field() - versions: Optional[dict] = Field( + versions: Optional[list] = Field( default=None, sa_column=Column(JSON, nullable=True) ) config: Optional[dict] = Field(default=None, sa_column=Column(JSON, nullable=True)) @@ -146,6 +147,7 @@ def __init__( self.s3_controller = s3_controller self.workspace_bucket = workspace_bucket self.store = store + self._cache = store.get_redis_cache() self._vectordb_client = self.store.get_vectordb_client() self._openai_client = self.store.get_openai_client() self._cache_dir = self.store.get_cache_dir() @@ -198,16 +200,25 @@ async def list_children( order_by: str = None, pagination: bool = False, silent: bool = False, + no_cache: bool = False, user_info: self.store.login_optional = Depends(self.store.login_optional), ): """List child artifacts of a specified artifact.""" try: + parent_id = f"{workspace}/{artifact_alias}" + cache_key = f"artifact_children:{parent_id}:{offset}:{limit}:{order_by}:{keywords}:{filters}:{mode}" + if not no_cache: + logger.info(f"Responding to list request ({parent_id}) from cache") + cached_results = await self._cache.get(cache_key) + if cached_results: + return cached_results if keywords: keywords = keywords.split(",") if filters: filters = json.loads(filters) - return await self.list_children( - parent_id=f"{workspace}/{artifact_alias}", + + results = await self.list_children( + parent_id=parent_id, offset=offset, limit=limit, order_by=order_by, @@ -218,6 +229,8 @@ async def list_children( silent=silent, context={"user": user_info.model_dump(), "ws": workspace}, ) + await self._cache.set(cache_key, results, ttl=60) + return results except KeyError: raise HTTPException(status_code=404, detail="Parent artifact not found") except PermissionError: @@ -243,6 +256,8 @@ async def get_file( silent: bool = False, version: str = None, token: str = None, + limit: int = 1000, + use_proxy: bool = False, user_info: self.store.login_optional = Depends(self.store.login_optional), ): """Retrieve a file within an artifact or list files in a directory.""" @@ -277,7 +292,7 @@ async def get_file( s3_client, s3_config["bucket"], file_key.rstrip("/") + "/", - max_length=1000, + max_length=limit, ) if not items: raise HTTPException( @@ -286,7 +301,6 @@ async def get_file( ) return items - s3_client = self._create_client_async(s3_config) # Increment download count unless silent if not silent: if artifact.config and "download_weights" in artifact.config: @@ -304,7 +318,28 @@ async def get_file( increment=download_weight, ) await session.commit() - return FSFileResponse(s3_client, s3_config["bucket"], file_key) + if use_proxy: + s3_client = self._create_client_async(s3_config) + return FSFileResponse(s3_client, s3_config["bucket"], file_key) + else: + async with self._create_client_async( + s3_config, + ) as s3_client: + # generate presigned url, then redirect + presigned_url = await s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": s3_config["bucket"], + "Key": file_key, + }, + ) + if s3_config["public_endpoint_url"]: + # replace the endpoint with the proxy base URL + presigned_url = presigned_url.replace( + s3_config["endpoint_url"], + s3_config["public_endpoint_url"], + ) + return RedirectResponse(url=presigned_url, status_code=302) except KeyError: raise HTTPException(status_code=404, detail="Artifact not found") @@ -393,6 +428,7 @@ async def _get_artifact_with_parent(self, session, artifact_id): ) parent_result = await session.execute(parent_query) parent_artifact = parent_result.scalar_one_or_none() + return artifact, parent_artifact def _generate_artifact_data(self, artifact, parent_artifact=None): @@ -2140,8 +2176,18 @@ async def list_children( # Handle filter-based search with specific key-value matching if filters: for key, value in filters.items(): - if key == "stage": - stage = bool(value) + if key == "version": + assert value in [ + "stage", + "latest", + "*", + ], "Invalid version value, it should be 'stage' or 'latest'." + if value == "stage": + stage = True + elif value == "latest": + stage = False + else: + stage = None continue if key == "manifest" and isinstance(value, dict): @@ -2223,29 +2269,32 @@ async def list_children( else: raise ValueError(f"Invalid filter key: {key}") - if backend == "sqlite": - if stage: - stage_condition = and_( - ArtifactModel.staging.isnot(None), text("staging != 'null'") - ) - else: - stage_condition = or_( - ArtifactModel.staging.is_(None), text("staging = 'null'") - ) - else: - if stage: - stage_condition = and_( - ArtifactModel.staging.isnot(None), - text("staging::text != 'null'"), - ) + if stage is not None: + if backend == "sqlite": + if stage: + stage_condition = and_( + ArtifactModel.staging.isnot(None), + text("staging != 'null'"), + ) + else: + stage_condition = or_( + ArtifactModel.staging.is_(None), + text("staging = 'null'"), + ) else: - stage_condition = or_( - ArtifactModel.staging.is_(None), - text("staging::text = 'null'"), - ) + if stage: + stage_condition = and_( + ArtifactModel.staging.isnot(None), + text("staging::text != 'null'"), + ) + else: + stage_condition = or_( + ArtifactModel.staging.is_(None), + text("staging::text = 'null'"), + ) - query = query.where(stage_condition) - count_query = count_query.where(stage_condition) + query = query.where(stage_condition) + count_query = count_query.where(stage_condition) # Combine conditions based on mode (AND/OR) if conditions: diff --git a/hypha/core/store.py b/hypha/core/store.py index 6c5344e7..5aed377c 100644 --- a/hypha/core/store.py +++ b/hypha/core/store.py @@ -13,6 +13,7 @@ from hypha_rpc.utils.schema import schema_method from starlette.routing import Mount from pydantic.fields import Field +from aiocache.backends.redis import RedisCache from hypha import __version__ from hypha.core import ( @@ -179,6 +180,9 @@ def __init__( self._redis = aioredis.FakeRedis.from_url("redis://localhost:9997/11") + self._redis_cache = RedisCache() + self._redis_cache.client = self._redis + self._root_user = None self._event_bus = RedisEventBus(self._redis) @@ -229,6 +233,9 @@ async def setup_root_user(self) -> UserInfo: ) return self._root_user + def get_redis_cache(self): + return self._redis_cache + async def load_or_create_workspace(self, user_info: UserInfo, workspace: str): """Setup the workspace.""" if workspace is None: diff --git a/hypha/http.py b/hypha/http.py index e8fbcaf9..74dd62d4 100644 --- a/hypha/http.py +++ b/hypha/http.py @@ -3,7 +3,6 @@ import json import traceback from typing import Any -import asyncio import logging import sys from pathlib import Path diff --git a/hypha/s3.py b/hypha/s3.py index 6d15f1a7..f2561669 100644 --- a/hypha/s3.py +++ b/hypha/s3.py @@ -11,8 +11,6 @@ import zipfile from io import BytesIO -from aiocache import Cache -from aiocache.decorators import cached import botocore from aiobotocore.session import get_session from botocore.exceptions import ClientError @@ -122,12 +120,6 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if self.background is not None: await self.background() - -@cached( - ttl=30, - cache=Cache.MEMORY, - key_builder=lambda *args, **kwargs: f"{args[1]}/{args[2]}@{args[3]}", -) async def fetch_zip_tail(s3_client, workspace_bucket, s3_key, content_length): """ Fetch the tail part of the zip file that contains the central directory. @@ -219,6 +211,7 @@ def __init__( store.register_public_service(self.get_s3_service()) store.set_s3_controller(self) + cache = store.get_redis_cache() router = APIRouter() @@ -321,9 +314,13 @@ async def get_zip_file_content( ) # Fetch the ZIP's central directory from cache or download if not cached - zip_tail = await fetch_zip_tail( - s3_client, self.workspace_bucket, s3_key, content_length - ) + cache_key = f"zip_tail:{self.workspace_bucket}:{s3_key}:{content_length}" + zip_tail = await cache.get(cache_key) + if zip_tail is None: + zip_tail = await fetch_zip_tail( + s3_client, self.workspace_bucket, s3_key, content_length + ) + await cache.set(cache_key, zip_tail, ttl=60) # Open the in-memory ZIP tail and parse it with zipfile.ZipFile(BytesIO(zip_tail)) as zip_file: diff --git a/tests/test_artifact.py b/tests/test_artifact.py index 695ea27b..0073ed72 100644 --- a/tests/test_artifact.py +++ b/tests/test_artifact.py @@ -251,14 +251,14 @@ async def test_sqlite_create_and_search_artifacts( search_results = await artifact_manager.list( parent_id=collection.id, - filters={"stage": True, "manifest": {"description": "*dataset*"}}, + filters={"version": "stage", "manifest": {"description": "*dataset*"}}, ) assert len(search_results) == len(datasets) results = await artifact_manager.list( parent_id=collection.id, - filters={"stage": True, "manifest": {"description": "*dataset*"}}, + filters={"version": "stage", "manifest": {"description": "*dataset*"}}, pagination=True, ) assert results["total"] == len(datasets) @@ -266,7 +266,7 @@ async def test_sqlite_create_and_search_artifacts( # list application only search_results = await artifact_manager.list( - parent_id=collection.id, filters={"stage": True, "type": "application"} + parent_id=collection.id, filters={"version": "stage", "type": "application"} ) assert len(search_results) == 1 @@ -444,8 +444,11 @@ async def test_http_file_and_directory_endpoint( # Retrieve the file via HTTP response = requests.get( - f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/files/example.txt" + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/files/example.txt", + allow_redirects=True, ) + # Check if the connection has been redirected + assert response.history[0].status_code == 302 assert response.status_code == 200 assert response.text == file_contents @@ -455,6 +458,19 @@ async def test_http_file_and_directory_endpoint( ) assert artifact["download_count"] == 1 + # Try to get it using http proxy + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/files/example.txt?use_proxy=1" + ) + assert response.status_code == 200 + assert response.text == file_contents + + # Check download count increment + artifact = await artifact_manager.read( + artifact_id=dataset.id, + ) + assert artifact["download_count"] == 2 + # Attempt to list directory contents (should be successful after attempting file) response = requests.get( f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/files/" @@ -803,7 +819,7 @@ async def test_artifact_filtering( # Filter by `created_by`: Only artifacts created by user_id1 should be returned results = await artifact_manager.list( parent_id=collection.id, - filters={"created_by": user_id1, "stage": False}, + filters={"created_by": user_id1}, mode="AND", ) assert len(results) == 2 @@ -822,7 +838,7 @@ async def test_artifact_filtering( # Filter by `stage`: Only staged artifacts should be returned results = await artifact_manager.list( - parent_id=collection.id, filters={"stage": True}, mode="AND" + parent_id=collection.id, filters={"version": "stage"}, mode="AND" ) assert len(results) == 3 @@ -909,6 +925,7 @@ async def test_http_artifact_endpoint(minio_server, fastapi_server, test_user_to assert artifact_data["download_count"] == 1 # Retrieve the collection's children to verify the dataset presence + # This will also trigger caching of the collection's children response = requests.get( f"{SERVER_URL}/{api.config.workspace}/artifacts/{collection.alias}/children" ) @@ -917,6 +934,35 @@ async def test_http_artifact_endpoint(minio_server, fastapi_server, test_user_to manifest = item["manifest"] assert "Test Dataset" == manifest["name"] + # Now let's add a new dataset to the collection + dataset_manifest = { + "name": "Test Dataset 2", + "description": "A test dataset for HTTP endpoint", + } + dataset2 = await artifact_manager.create( + type="dataset", + parent_id=collection.id, + manifest=dataset_manifest, + config={"permissions": {"*": "r", "@": "r+"}}, + ) + # now get the children again + # due to caching, the new dataset should not be in the response + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{collection.alias}/children" + ) + assert response.status_code == 200 + item = find_item(response.json(), "id", dataset2.id) + assert item is None + + # But we can force a refresh of the cache with 'no_cache' query parameter + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{collection.alias}/children?no_cache=1" + ) + assert response.status_code == 200 + item = find_item(response.json(), "id", dataset2.id) + manifest = item["manifest"] + assert "Test Dataset 2" == manifest["name"] + async def test_edit_existing_artifact(minio_server, fastapi_server, test_user_token): """Test editing an existing artifact.""" @@ -1187,7 +1233,7 @@ async def test_artifact_manager_with_collection( # Confirm the dataset's inclusion in the collection collection_items = await artifact_manager.list( - parent_id=collection.id, filters={"stage": True} + parent_id=collection.id, filters={"version": "stage"} ) item = find_item(collection_items, "id", dataset.id) assert item["manifest"]["name"] == "test-dataset"