Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Nov 24, 2024
1 parent 927796b commit cc4bf52
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 59 deletions.
18 changes: 9 additions & 9 deletions docs/artifact-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,15 @@ Retrieve a list of child artifacts within a specified collection, supporting key
- `artifact_id` (str): The id of the parent artifact or collection to list children from. It can be an uuid generated by `create` or `edit` function, or it can be an alias of the artifact under the current workspace. If you want to refer to an artifact in another workspace, you should use the full alias in the format of `"workspace_id/alias"`. If not specified, the function lists all top-level artifacts in the current workspace.
- `keywords` (List[str], optional): A list of search terms used for fuzzy searching across all manifest fields. Each term is searched independently, and results matching any term will be included. For example, `["sample", "dataset"]` returns artifacts containing either "sample" or "dataset" in any field of the manifest.

- `filters` (dict, optional): A dictionary where each key is a manifest field name and each value specifies the match for that field. Filters support both exact and range-based matching, depending on the field. You can filter based on the keys inside the manifest, as well as internal fields like permissions and view/download statistics by adding a dot (`.`) before the field name. Supported internal fields include:
- **`.type`**: Matches the artifact type exactly, e.g., `{"type": "application"}`.
- **`.created_by`**: Matches the exact creator ID, e.g., `{"created_by": "user123"}`.
- **`.created_at`** and **`last_modified`**: Accept a single timestamp (lower bound) or a range for filtering. For example, `{"created_at": [1620000000, 1630000000]}` filters artifacts created between the two timestamps.
- **`.view_count`** and **`download_count`**: Accept a single value or a range for filtering, as with date fields. For example, `{"view_count": [10, 100]}` filters artifacts viewed between 10 and 100 times.
- **`.permissions/<user_id>`**: Searches for artifacts with specific permissions assigned to the given `user_id`.
- **`.stage`**: If `true`, returns only artifacts in staging mode. Example: `{"stage": true}`. By default, .stage filter is always set to `false`.
- **`any other field in manifest`**: Matches the exact value of the field, e.g., `{"name": "example-dataset"}`. These filters also support fuzzy matching if a value contains a wildcard (`*`), e.g., `{"name": "dataset*"}`, depending on the SQL backend.

- `filters` (dict, optional): A dictionary where each key is a manifest field name and each value specifies the match for that field. Filters support both exact and range-based matching, depending on the field. You can filter based on the keys inside the manifest, as well as internal fields like permissions and view/download statistics. Supported internal fields include:
- **`type`**: Matches the artifact type exactly, e.g., `{"type": "application"}`.
- **`created_by`**: Matches the exact creator ID, e.g., `{"created_by": "user123"}`.
- **`created_at`** and **`last_modified`**: Accept a single timestamp (lower bound) or a range for filtering. For example, `{"created_at": [1620000000, 1630000000]}` filters artifacts created between the two timestamps.
- **`view_count`** and **`download_count`**: Accept a single value or a range for filtering, as with date fields. For example, `{"view_count": [10, 100]}` filters artifacts viewed between 10 and 100 times.
- **`permissions/<user_id>`**: Searches for artifacts with specific permissions assigned to the given `user_id`.
- **`version`**: Matches the exact version of the artifact, it only support `"stage"`, `"committed"` or `"*"` (both staged or committed).
- **`manifest`**: Matches the exact value of the field, e.g., `"manifest": {"name": "example-dataset"}`. These filters also support fuzzy matching if a value contains a wildcard (`*`), e.g., `"manifest": {"name": "dataset*"}`, depending on the SQL backend.
- **`config`**: Matches the exact value of the field in the config, e.g., `"config": {"collection_schema": {"type": "object"}}`.
- `mode` (str, optional): Defines how multiple conditions (from keywords and filters) are combined. Use `"AND"` to ensure all conditions must match, or `"OR"` to include artifacts meeting any condition. Default is `"AND"`.

- `offset` (int, optional): The number of artifacts to skip before listing results. Default is `0`.
Expand Down
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.20.39.post17"
"version": "0.20.39.post18"
}
109 changes: 79 additions & 30 deletions hypha/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from aiobotocore.session import get_session

from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from hypha.core import (
UserInfo,
UserPermission,
Expand Down Expand Up @@ -64,14 +65,14 @@ class ArtifactModel(SQLModel, table=True): # `table=True` makes it a table mode
manifest: Optional[dict] = Field(
default=None, sa_column=Column(JSON, nullable=True)
)
staging: Optional[dict] = Field(default=None, sa_column=Column(JSON, nullable=True))
staging: Optional[list] = Field(default=None, sa_column=Column(JSON, nullable=True))
download_count: float = Field(default=0.0)
view_count: float = Field(default=0.0)
file_count: int = Field(default=0)
created_at: int = Field()
created_by: Optional[str] = Field(default=None)
last_modified: int = Field()
versions: Optional[dict] = Field(
versions: Optional[list] = Field(
default=None, sa_column=Column(JSON, nullable=True)
)
config: Optional[dict] = Field(default=None, sa_column=Column(JSON, nullable=True))
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(
self.s3_controller = s3_controller
self.workspace_bucket = workspace_bucket
self.store = store
self._cache = store.get_redis_cache()
self._vectordb_client = self.store.get_vectordb_client()
self._openai_client = self.store.get_openai_client()
self._cache_dir = self.store.get_cache_dir()
Expand Down Expand Up @@ -198,16 +200,25 @@ async def list_children(
order_by: str = None,
pagination: bool = False,
silent: bool = False,
no_cache: bool = False,
user_info: self.store.login_optional = Depends(self.store.login_optional),
):
"""List child artifacts of a specified artifact."""
try:
parent_id = f"{workspace}/{artifact_alias}"
cache_key = f"artifact_children:{parent_id}:{offset}:{limit}:{order_by}:{keywords}:{filters}:{mode}"
if not no_cache:
logger.info(f"Responding to list request ({parent_id}) from cache")
cached_results = await self._cache.get(cache_key)
if cached_results:
return cached_results
if keywords:
keywords = keywords.split(",")
if filters:
filters = json.loads(filters)
return await self.list_children(
parent_id=f"{workspace}/{artifact_alias}",

results = await self.list_children(
parent_id=parent_id,
offset=offset,
limit=limit,
order_by=order_by,
Expand All @@ -218,6 +229,8 @@ async def list_children(
silent=silent,
context={"user": user_info.model_dump(), "ws": workspace},
)
await self._cache.set(cache_key, results, ttl=60)
return results
except KeyError:
raise HTTPException(status_code=404, detail="Parent artifact not found")
except PermissionError:
Expand All @@ -243,6 +256,8 @@ async def get_file(
silent: bool = False,
version: str = None,
token: str = None,
limit: int = 1000,
use_proxy: bool = False,
user_info: self.store.login_optional = Depends(self.store.login_optional),
):
"""Retrieve a file within an artifact or list files in a directory."""
Expand Down Expand Up @@ -277,7 +292,7 @@ async def get_file(
s3_client,
s3_config["bucket"],
file_key.rstrip("/") + "/",
max_length=1000,
max_length=limit,
)
if not items:
raise HTTPException(
Expand All @@ -286,7 +301,6 @@ async def get_file(
)
return items

s3_client = self._create_client_async(s3_config)
# Increment download count unless silent
if not silent:
if artifact.config and "download_weights" in artifact.config:
Expand All @@ -304,7 +318,28 @@ async def get_file(
increment=download_weight,
)
await session.commit()
return FSFileResponse(s3_client, s3_config["bucket"], file_key)
if use_proxy:
s3_client = self._create_client_async(s3_config)
return FSFileResponse(s3_client, s3_config["bucket"], file_key)
else:
async with self._create_client_async(
s3_config,
) as s3_client:
# generate presigned url, then redirect
presigned_url = await s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": s3_config["bucket"],
"Key": file_key,
},
)
if s3_config["public_endpoint_url"]:
# replace the endpoint with the proxy base URL
presigned_url = presigned_url.replace(
s3_config["endpoint_url"],
s3_config["public_endpoint_url"],
)
return RedirectResponse(url=presigned_url, status_code=302)

except KeyError:
raise HTTPException(status_code=404, detail="Artifact not found")
Expand Down Expand Up @@ -393,6 +428,7 @@ async def _get_artifact_with_parent(self, session, artifact_id):
)
parent_result = await session.execute(parent_query)
parent_artifact = parent_result.scalar_one_or_none()

return artifact, parent_artifact

def _generate_artifact_data(self, artifact, parent_artifact=None):
Expand Down Expand Up @@ -2140,8 +2176,18 @@ async def list_children(
# Handle filter-based search with specific key-value matching
if filters:
for key, value in filters.items():
if key == "stage":
stage = bool(value)
if key == "version":
assert value in [
"stage",
"latest",
"*",
], "Invalid version value, it should be 'stage' or 'latest'."
if value == "stage":
stage = True
elif value == "latest":
stage = False
else:
stage = None
continue

if key == "manifest" and isinstance(value, dict):
Expand Down Expand Up @@ -2223,29 +2269,32 @@ async def list_children(
else:
raise ValueError(f"Invalid filter key: {key}")

if backend == "sqlite":
if stage:
stage_condition = and_(
ArtifactModel.staging.isnot(None), text("staging != 'null'")
)
else:
stage_condition = or_(
ArtifactModel.staging.is_(None), text("staging = 'null'")
)
else:
if stage:
stage_condition = and_(
ArtifactModel.staging.isnot(None),
text("staging::text != 'null'"),
)
if stage is not None:
if backend == "sqlite":
if stage:
stage_condition = and_(
ArtifactModel.staging.isnot(None),
text("staging != 'null'"),
)
else:
stage_condition = or_(
ArtifactModel.staging.is_(None),
text("staging = 'null'"),
)
else:
stage_condition = or_(
ArtifactModel.staging.is_(None),
text("staging::text = 'null'"),
)
if stage:
stage_condition = and_(
ArtifactModel.staging.isnot(None),
text("staging::text != 'null'"),
)
else:
stage_condition = or_(
ArtifactModel.staging.is_(None),
text("staging::text = 'null'"),
)

query = query.where(stage_condition)
count_query = count_query.where(stage_condition)
query = query.where(stage_condition)
count_query = count_query.where(stage_condition)

# Combine conditions based on mode (AND/OR)
if conditions:
Expand Down
7 changes: 7 additions & 0 deletions hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from hypha_rpc.utils.schema import schema_method
from starlette.routing import Mount
from pydantic.fields import Field
from aiocache.backends.redis import RedisCache

from hypha import __version__
from hypha.core import (
Expand Down Expand Up @@ -179,6 +180,9 @@ def __init__(

self._redis = aioredis.FakeRedis.from_url("redis://localhost:9997/11")

self._redis_cache = RedisCache()
self._redis_cache.client = self._redis

self._root_user = None
self._event_bus = RedisEventBus(self._redis)

Expand Down Expand Up @@ -229,6 +233,9 @@ async def setup_root_user(self) -> UserInfo:
)
return self._root_user

def get_redis_cache(self):
return self._redis_cache

async def load_or_create_workspace(self, user_info: UserInfo, workspace: str):
"""Setup the workspace."""
if workspace is None:
Expand Down
1 change: 0 additions & 1 deletion hypha/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import traceback
from typing import Any
import asyncio
import logging
import sys
from pathlib import Path
Expand Down
19 changes: 8 additions & 11 deletions hypha/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import zipfile
from io import BytesIO

from aiocache import Cache
from aiocache.decorators import cached
import botocore
from aiobotocore.session import get_session
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -122,12 +120,6 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.background is not None:
await self.background()


@cached(
ttl=30,
cache=Cache.MEMORY,
key_builder=lambda *args, **kwargs: f"{args[1]}/{args[2]}@{args[3]}",
)
async def fetch_zip_tail(s3_client, workspace_bucket, s3_key, content_length):
"""
Fetch the tail part of the zip file that contains the central directory.
Expand Down Expand Up @@ -219,6 +211,7 @@ def __init__(

store.register_public_service(self.get_s3_service())
store.set_s3_controller(self)
cache = store.get_redis_cache()

router = APIRouter()

Expand Down Expand Up @@ -321,9 +314,13 @@ async def get_zip_file_content(
)

# Fetch the ZIP's central directory from cache or download if not cached
zip_tail = await fetch_zip_tail(
s3_client, self.workspace_bucket, s3_key, content_length
)
cache_key = f"zip_tail:{self.workspace_bucket}:{s3_key}:{content_length}"
zip_tail = await cache.get(cache_key)
if zip_tail is None:
zip_tail = await fetch_zip_tail(
s3_client, self.workspace_bucket, s3_key, content_length
)
await cache.set(cache_key, zip_tail, ttl=60)

# Open the in-memory ZIP tail and parse it
with zipfile.ZipFile(BytesIO(zip_tail)) as zip_file:
Expand Down
Loading

0 comments on commit cc4bf52

Please sign in to comment.