From 9a010d67d4142877e6ea74c39ae93839e8e6bf33 Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Sun, 24 Nov 2024 20:58:29 -0800 Subject: [PATCH] Fix zip supports --- docs/artifact-manager.md | 233 +++++++++++++++--- hypha/apps.py | 2 + hypha/artifact.py | 499 ++++++++++++++++++++++++++++++--------- hypha/s3.py | 125 ---------- tests/test_artifact.py | 147 +++++++++++- tests/test_s3.py | 79 ------- 6 files changed, 728 insertions(+), 357 deletions(-) diff --git a/docs/artifact-manager.md b/docs/artifact-manager.md index db9ae6d8..5a429e09 100644 --- a/docs/artifact-manager.md +++ b/docs/artifact-manager.md @@ -706,70 +706,227 @@ datasets = await artifact_manager.list(collection.id) print("Datasets in the gallery:", datasets) ``` - ## HTTP API for Accessing Artifacts and Download Counts -The `Artifact Manager` provides an HTTP endpoint for retrieving artifact manifests, data, and download statistics. This is useful for public-facing web applications that need to access datasets, models, or applications. +The `Artifact Manager` provides an HTTP API for retrieving artifact manifests, data, file statistics, and managing zip files. These endpoints are designed for public-facing web applications that need to interact with datasets, models, or applications. -### Endpoints: +--- +### Artifact Metadata and File Access Endpoints - - `/{workspace}/artifacts/{artifact_alias}` for fetching the artifact manifest. - - `/{workspace}/artifacts/{artifact_alias}/children` for listing all artifacts in a collection. - - `/{workspace}/artifacts/{artifact_alias}/files` for listing all files in the artifact. - - `/{workspace}/artifacts/{artifact_alias}/files/{file_path:path}` for downloading a file from the artifact (will be redirected to a pre-signed URL). +#### Endpoints: +- `/{workspace}/artifacts/{artifact_alias}`: Fetch the artifact manifest. +- `/{workspace}/artifacts/{artifact_alias}/children`: List all artifacts in a collection. +- `/{workspace}/artifacts/{artifact_alias}/files`: List all files in the artifact. +- `/{workspace}/artifacts/{artifact_alias}/files/{file_path:path}`: Download a file from the artifact (redirects to a pre-signed URL). -### Request Format: +#### Request Format: - **Method**: `GET` -- **Headers**: - - `Authorization`: Optional. The user's token for accessing private artifacts (obtained via the login logic or created by `api.generate_token()`). Not required for public artifacts. - -### Path Parameters: +- **Headers**: + - `Authorization`: Optional. The user's token for accessing private artifacts (obtained via login logic or created by `api.generate_token()`). Not required for public artifacts. -The path parameters are used to specify the artifact or file to access. The following parameters are supported: +#### Path Parameters: - **workspace**: The workspace in which the artifact is stored. -- **artifact_alias**: The alias or id of the artifact to access. This can be an artifact id generated by `create` or `edit` function, or it can be an alias of the artifact under the current workspace. Note that this artifact_alias should not contain the workspace. -- **file_path**: Optional, the relative path to a file within the artifact. This is optional and only required when downloading a file. +- **artifact_alias**: The alias or ID of the artifact to access. This can be generated by `create` or `edit` functions or be an alias under the current workspace. +- **file_path**: (Optional) The relative path to a file within the artifact. + +#### Response Examples: + +- **Artifact Manifest**: + ```json + { + "manifest": { + "name": "Example Dataset", + "description": "A dataset for testing.", + "version": "1.0.0" + }, + "view_count": 150, + "download_count": 25 + } + ``` + +- **Files in Artifact**: + ```json + [ + {"name": "example.txt", "type": "file"}, + {"name": "nested", "type": "directory"} + ] + ``` + +- **Download File**: A redirect to a pre-signed URL for the file. -### Query Parameters: - -Qury parameters are passed after the `?` in the URL and are used to control the behavior of the API. The following query parameters are supported: +--- -- **stage**: A boolean flag to fetch the staged version of the manifest. Default is `False`. -- **silent**: A boolean flag to suppress the view count increment. Default is `False`. +### Dynamic Zip File Creation Endpoint -- **keywords**: A list of search terms used for fuzzy searching across all manifest fields, separated by commas. -- **filters**: A dictionary of filters to apply to the search, in the format of a JSON string. -- **mode**: The mode for combining multiple conditions. Default is `AND`. -- **offset**: The number of artifacts to skip before listing results. Default is `0`. -- **limit**: The maximum number of artifacts to return. Default is `100`. -- **order_by**: The field used to order results. Default is ascending by id. -- **silent**: A boolean flag to prevent incrementing the view count for the parent artifact when listing children, listing files, or reading the artifact. Default is `False`. +#### Endpoint: -### Response: +- `/{workspace}/artifacts/{artifact_alias}/create-zip-file`: Stream a dynamically created zip file containing selected or all files in the artifact. -For `/{workspace}/artifacts/{artifact_alias}`, the response will be a JSON object representing the artifact manifest. For `/{workspace}/artifacts/{artifact_alias}/__files__/{file_path:path}`, the response will be a pre-signed URL to download the file. The artifact manifest will also include any metadata such as download statistics, e.g. `view_count`, `download_count`. For private artifacts, make sure if the user has the necessary permissions. +#### Request Format: -For `/{workspace}/artifacts/{artifact_alias}/children`, the response will be a list of artifacts in the collection. +- **Method**: `GET` +- **Query Parameters**: + - **file**: (Optional) A list of files to include in the zip file. If omitted, all files in the artifact are included. + - **token**: (Optional) User token for private artifact access. + - **version**: (Optional) The version of the artifact to fetch files from. -For `/{workspace}/artifacts/{artifact_alias}/files`, the response will be a list of files in the artifact, each file is a dictionary with the `name` and `type` fields. +#### Response: -For `/{workspace}/artifacts/{artifact_alias}/files/{file_path:path}`, the response will be a pre-signed URL to download the file. +- Streams the zip file back to the client. +- **Headers**: + - `Content-Disposition`: Attachment with the artifact alias as the filename. -### Example: Fetching a public artifact with download statistics +#### Example Usage: ```python import requests SERVER_URL = "https://hypha.aicell.io" workspace = "my-workspace" -response = requests.get(f"{SERVER_URL}/{workspace}/artifacts/example-dataset") -if response.ok: - artifact = response.json() - print(artifact["manifest"]["name"]) # Output: Example Dataset - print(artifact["download_count"]) # Output: Download count for the dataset +artifact_alias = "example-dataset" +files = ["example.txt", "nested/example2.txt"] + +response = requests.get( + f"{SERVER_URL}/{workspace}/artifacts/{artifact_alias}/create-zip-file", + params={"file": files}, + stream=True, +) +if response.status_code == 200: + with open("artifact_files.zip", "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + print("Zip file created successfully.") else: print(f"Error: {response.status_code}") ``` + +--- + +### Zip File Access Endpoints + +These endpoints allow direct access to zip file contents stored in the artifact without requiring the entire zip file to be downloaded or extracted. + +#### Endpoints: + +1. **`/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path:path}?path=...`** + - Access the contents of a zip file, specifying the path within the zip file using a query parameter (`?path=`). + +2. **`/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path:path}/~/{path:path|}`** + - Access the contents of a zip file, separating the zip file path and the internal path using `/~/`. + +--- + +#### Endpoint 1: `/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path:path}?path=...` + +##### Functionality: + +- **If `path` ends with `/`:** Lists the contents of the directory specified by `path` inside the zip file. +- **If `path` specifies a file:** Streams the file content from the zip. + +##### Request Format: + +- **Method**: `GET` +- **Path Parameters**: + - **workspace**: The workspace in which the artifact is stored. + - **artifact_alias**: The alias or ID of the artifact to access. + - **zip_file_path**: Path to the zip file within the artifact. +- **Query Parameters**: + - **path**: (Optional) The relative path inside the zip file. Defaults to the root directory. + +##### Response Examples: + +1. **Listing Directory Contents**: + ```json + [ + {"type": "file", "name": "example.txt", "size": 123, "last_modified": 1732363845.0}, + {"type": "directory", "name": "nested"} + ] + ``` + +2. **Fetching a File**: + Streams the file content from the zip. + +--- + +#### Endpoint 2: `/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path:path}/~/{path:path}` + +##### Functionality: + +- **If `path` ends with `/`:** Lists the contents of the directory specified by `path` inside the zip file. +- **If `path` specifies a file:** Streams the file content from the zip. + +##### Request Format: + +- **Method**: `GET` +- **Path Parameters**: + - **workspace**: The workspace in which the artifact is stored. + - **artifact_alias**: The alias or ID of the artifact to access. + - **zip_file_path**: Path to the zip file within the artifact. + - **path**: (Optional) The relative path inside the zip file. Defaults to the root directory. + +##### Response Examples: + +1. **Listing Directory Contents**: + ```json + [ + {"type": "file", "name": "example.txt", "size": 123, "last_modified": 1732363845.0}, + {"type": "directory", "name": "nested"} + ] + ``` + +2. **Fetching a File**: + Streams the file content from the zip. + +--- + +#### Example Usage for Both Endpoints + +##### Listing Directory Contents: + +```python +import requests + +SERVER_URL = "https://hypha.aicell.io" +workspace = "my-workspace" +artifact_alias = "example-dataset" +zip_file_path = "example.zip" + +# Using the query parameter method +response = requests.get( + f"{SERVER_URL}/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path}", + params={"path": "nested/"} +) +print(response.json()) + +# Using the tilde method +response = requests.get( + f"{SERVER_URL}/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path}/~/nested/" +) +print(response.json()) +``` + +##### Fetching a File: + +```python +# Using the query parameter method +response = requests.get( + f"{SERVER_URL}/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path}", + params={"path": "nested/example2.txt"}, + stream=True, +) +with open("example2.txt", "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + +# Using the tilde method +response = requests.get( + f"{SERVER_URL}/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path}/~/nested/example2.txt", + stream=True, +) +with open("example2.txt", "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) +``` diff --git a/hypha/apps.py b/hypha/apps.py index a4a30359..c0db0582 100644 --- a/hypha/apps.py +++ b/hypha/apps.py @@ -380,6 +380,7 @@ async def start( + f"&server_url={server_url}" + (f"&token={token}" if token else "") + (f"&version={version}" if version else "") + + (f"&use_proxy=true") ) server_url = self.public_base_url public_url = ( @@ -389,6 +390,7 @@ async def start( + f"&server_url={server_url}" + (f"&token={token}" if token else "") + (f"&version={version}" if version else "") + + (f"&use_proxy=true") ) runner = random.choice(self._runner) diff --git a/hypha/artifact.py b/hypha/artifact.py index 44b3d316..f052b5ce 100644 --- a/hypha/artifact.py +++ b/hypha/artifact.py @@ -6,6 +6,8 @@ import numpy as np import re import json +from io import BytesIO +import zipfile import asyncio from sqlalchemy import ( event, @@ -39,7 +41,12 @@ from aiobotocore.session import get_session from fastapi import APIRouter, Depends, HTTPException, Query -from fastapi.responses import RedirectResponse, StreamingResponse +from fastapi.responses import ( + RedirectResponse, + StreamingResponse, + JSONResponse, + Response, +) from hypha.core import ( UserInfo, UserPermission, @@ -134,6 +141,22 @@ def set_nested_value(dictionary, field, value): dictionary[keys[-1]] = value +async def fetch_zip_tail(s3_client, workspace_bucket, s3_key, content_length): + """ + Fetch the tail part of the zip file that contains the central directory. + This result is cached to avoid re-fetching the central directory multiple times. + """ + central_directory_offset = max(content_length - 65536, 0) + range_header = f"bytes={central_directory_offset}-{content_length}" + + # Fetch the last part of the ZIP file that contains the central directory + response = await s3_client.get_object( + Bucket=workspace_bucket, Key=s3_key, Range=range_header + ) + zip_tail = await response["Body"].read() + return zip_tail + + class ArtifactController: """Artifact Controller using SQLAlchemy for database backend and S3 for file storage.""" @@ -252,7 +275,7 @@ async def list_children( status_code=500, detail=f"An unexpected error occurred: {str(e)}" ) - @router.get("/{workspace}/artifacts/{artifact_alias}/zip-files") + @router.get("/{workspace}/artifacts/{artifact_alias}/create-zip-file") async def create_zip_file( workspace: str, artifact_alias: str, @@ -271,133 +294,389 @@ async def create_zip_file( if token: user_info = await self.store.parse_user_token(token) - async with session.begin(): - # Fetch artifact and check permissions - ( - artifact, - parent_artifact, - ) = await self._get_artifact_with_permission( - user_info, artifact_id, "get_file", session - ) - version_index = self._get_version_index(artifact, version) - s3_config = self._get_s3_config(artifact, parent_artifact) + try: + async with session.begin(): + # Fetch artifact and check permissions + ( + artifact, + parent_artifact, + ) = await self._get_artifact_with_permission( + user_info, artifact_id, "get_file", session + ) + except Exception as e: + raise e + finally: + await session.close() - async with self._create_client_async(s3_config) as s3_client: - if files is None: - # List all files in the artifact - root_dir_key = safe_join( - s3_config["prefix"], - workspace, - f"{self._artifacts_dir}/{artifact.id}/v{version_index}", - ) + version_index = self._get_version_index(artifact, version) + s3_config = self._get_s3_config(artifact, parent_artifact) - async def list_all_files(dir_path=""): - try: - dir_key = f"{root_dir_key}/{dir_path}".strip("/") - items = await list_objects_async( - s3_client, - s3_config["bucket"], - dir_key + "/", - ) - for item in items: - item_path = f"{dir_path}/{item['name']}".strip( - "/" - ) - if item["type"] == "file": - yield item_path - elif item["type"] == "directory": - async for sub_item in list_all_files( - item_path - ): - yield sub_item - except Exception as e: - logger.error(f"Error listing files: {str(e)}") - raise HTTPException( - status_code=500, detail="Error listing files" - ) + async with self._create_client_async(s3_config) as s3_client: + if files is None: + # List all files in the artifact + root_dir_key = safe_join( + s3_config["prefix"], + workspace, + f"{self._artifacts_dir}/{artifact.id}/v{version_index}", + ) - files = list_all_files() - else: + async def list_all_files(dir_path=""): + try: + dir_key = f"{root_dir_key}/{dir_path}".strip("/") + items = await list_objects_async( + s3_client, + s3_config["bucket"], + dir_key + "/", + ) + for item in items: + item_path = f"{dir_path}/{item['name']}".strip("/") + if item["type"] == "file": + yield item_path + elif item["type"] == "directory": + async for sub_item in list_all_files(item_path): + yield sub_item + except Exception as e: + logger.error(f"Error listing files: {str(e)}") + raise HTTPException( + status_code=500, detail="Error listing files" + ) + + files = list_all_files() + else: + + async def validate_files(files): + for file in files: + yield file - async def validate_files(files): - for file in files: - yield file + files = validate_files(files) - files = validate_files(files) + logger.info(f"Creating ZIP file for artifact: {artifact_alias}") - async def file_stream_generator(presigned_url: str): - """Fetch file content from presigned URL in chunks.""" + async def file_stream_generator(presigned_url: str): + """Fetch file content from presigned URL in chunks.""" + try: + async with httpx.AsyncClient() as client: + async with client.stream( + "GET", presigned_url + ) as response: + if response.status_code != 200: + logger.error( + f"Failed to fetch file from URL: {presigned_url}, Status: {response.status_code}" + ) + raise HTTPException( + status_code=404, + detail=f"Failed to fetch file: {presigned_url}", + ) + async for chunk in response.aiter_bytes( + 1024 * 64 + ): # 64KB chunks + logger.debug( + f"Yielding chunk of size: {len(chunk)}" + ) + yield chunk + except Exception as e: + logger.error(f"Error fetching file stream: {str(e)}") + raise HTTPException( + status_code=500, + detail="Error fetching file content", + ) + + async def member_files(): + """Yield file metadata and content for stream_zip.""" + modified_at = datetime.now() + mode = S_IFREG | 0o600 + download_updates = {} + if artifact.config and "download_weights" in artifact.config: + download_weights = artifact.config.get( + "download_weights", {} + ) + else: + download_weights = {} + async for path in files: + file_key = safe_join( + s3_config["prefix"], + workspace, + f"{self._artifacts_dir}/{artifact.id}/v{version_index}", + path, + ) + logger.info(f"Adding file to ZIP: {file_key}") try: - async with httpx.AsyncClient() as client: - async with client.stream( - "GET", presigned_url - ) as response: - if response.status_code != 200: - logger.error( - f"Failed to fetch file from URL: {presigned_url}, Status: {response.status_code}" - ) - raise HTTPException( - status_code=404, - detail=f"Failed to fetch file: {presigned_url}", - ) - async for chunk in response.aiter_bytes( - 1024 * 64 - ): # 64KB chunks - yield chunk + presigned_url = await s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": s3_config["bucket"], + "Key": file_key, + }, + ) + # Increment download count unless silent + if not silent: + download_weight = download_weights.get(path) or 0 + if download_weight > 0: + download_updates[path] = download_weight + + yield ( + path, + modified_at, + mode, + ZIP_32, + file_stream_generator(presigned_url), + ) except Exception as e: - logger.error(f"Error fetching file stream: {str(e)}") + logger.error(f"Error processing file {path}: {str(e)}") raise HTTPException( status_code=500, - detail="Error fetching file content", + detail=f"Error processing file: {path}", ) - async def member_files(): - """Yield file metadata and content for stream_zip.""" - modified_at = datetime.now() - mode = S_IFREG | 0o600 - async for path in files: - file_key = safe_join( - s3_config["prefix"], - workspace, - f"{self._artifacts_dir}/{artifact.id}/v{version_index}", - path, + if download_updates: + logger.info( + f"Bumping download count for artifact: {artifact_alias}" ) try: - presigned_url = ( - await s3_client.generate_presigned_url( - "get_object", - Params={ - "Bucket": s3_config["bucket"], - "Key": file_key, - }, - ) - ) - yield ( - path, - modified_at, - mode, - ZIP_32, - file_stream_generator(presigned_url), - ) + async with session.begin(): + for path in download_updates.keys(): + await self._increment_stat( + session, + artifact.id, + "download_count", + increment=download_updates[path], + ) + await session.commit() except Exception as e: logger.error( - f"Error processing file {path}: {str(e)}" - ) - raise HTTPException( - status_code=500, - detail=f"Error processing file: {path}", + f"Error bumping download count for artifact ({artifact_alias}): {str(e)}" ) + raise e + finally: + await session.close() + + # Return the ZIP file as a streaming response + return StreamingResponse( + async_stream_zip(member_files()), + media_type="application/zip", + headers={ + "Content-Disposition": f"attachment; filename={artifact_alias}.zip" + }, + ) - # Return the ZIP file as a streaming response - return StreamingResponse( - async_stream_zip(member_files()), - media_type="application/zip", - headers={ - "Content-Disposition": f"attachment; filename={artifact_alias}.zip" - }, + except KeyError: + raise HTTPException(status_code=404, detail="Artifact not found") + except PermissionError: + raise HTTPException(status_code=403, detail="Permission denied") + except HTTPException as e: + logger.error(f"HTTPException: {str(e)}") + raise e # Re-raise HTTPExceptions to be handled by FastAPI + except Exception as e: + logger.error(f"Unhandled exception in create_zip: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Internal server error: {str(e)}" + ) + + @router.get( + "/{workspace}/artifacts/{artifact_alias}/zip-files/{zip_file_path:path}" + ) + async def get_zip_file_content( + workspace: str, + artifact_alias: str, + zip_file_path: str, + path: str = "", + version: str = None, + token: str = None, + user_info: store.login_optional = Depends(store.login_optional), + ) -> Response: + """ + Serve content from a zip file stored in S3 without fully unzipping it. + `zip_file_path` is the path to the zip file, `path` is the relative path inside the zip file. + You can also use '/~/' to separate the zip file path and the relative path inside the zip file. + """ + + try: + # Validate artifact and permissions + artifact_id = self._validate_artifact_id( + artifact_alias, {"ws": workspace} + ) + session = await self._get_session(read_only=True) + if token: + user_info = await self.store.parse_user_token(token) + + if "/~/" in zip_file_path: + assert ( + not path + ), "You cannot specify a path when using a zip file with a tilde." + zip_file_path, path = zip_file_path.split("/~/", 1) + + async with session.begin(): + # Fetch artifact and check permissions + ( + artifact, + parent_artifact, + ) = await self._get_artifact_with_permission( + user_info, artifact_id, "get_file", session + ) + version_index = self._get_version_index(artifact, version) + s3_config = self._get_s3_config(artifact, parent_artifact) + async with self._create_client_async(s3_config) as s3_client: + # Full key of the ZIP file in the S3 bucket + s3_key = safe_join( + s3_config["prefix"], + workspace, + f"{self._artifacts_dir}/{artifact.id}/v{version_index}/{zip_file_path}", ) - await session.commit() + # Fetch the ZIP file metadata from S3 (to avoid downloading the whole file) + try: + # Fetch the ZIP file metadata from S3 (to avoid downloading the whole file) + zip_file_metadata = await s3_client.head_object( + Bucket=self.workspace_bucket, Key=s3_key + ) + content_length = zip_file_metadata["ContentLength"] + except ClientError as e: + # Check if the error is due to the file not being found (404) + if e.response["Error"]["Code"] == "404": + return JSONResponse( + status_code=404, + content={ + "success": False, + "detail": f"ZIP file not found: {s3_key}", + }, + ) + else: + # For other types of errors, raise a 500 error + return JSONResponse( + status_code=500, + content={ + "success": False, + "detail": f"Failed to fetch file metadata: {str(e)}", + }, + ) + + # Fetch the ZIP's central directory from cache or download if not cached + cache_key = f"zip_tail:{self.workspace_bucket}:{s3_key}:{content_length}" + zip_tail = await self._cache.get(cache_key) + if zip_tail is None: + zip_tail = await fetch_zip_tail( + s3_client, self.workspace_bucket, s3_key, content_length + ) + await self._cache.set(cache_key, zip_tail, ttl=60) + + # Open the in-memory ZIP tail and parse it + with zipfile.ZipFile(BytesIO(zip_tail)) as zip_file: + # If `file_path` ends with "/", treat it as a directory + if not path or path.endswith("/"): + # If `file_path` is empty, treat it as the root directory. + directory_contents = [] + for zip_info in zip_file.infolist(): + # Handle root directory or subdirectory files + if not path: + # If `file_path` is empty, treat it as the root directory. + # Extract immediate children of the root directory + relative_path = zip_info.filename.strip("/") + if "/" not in relative_path: + # Top-level file + directory_contents.append( + { + "type": "file" + if not zip_info.is_dir() + else "directory", + "name": relative_path, + "size": zip_info.file_size + if not zip_info.is_dir() + else None, + "last_modified": datetime( + *zip_info.date_time + ).timestamp(), + } + ) + else: + # Top-level directory + top_level_dir = relative_path.split("/")[0] + if not any( + d["name"] == top_level_dir + and d["type"] == "directory" + for d in directory_contents + ): + directory_contents.append( + { + "type": "directory", + "name": top_level_dir, + } + ) + else: + # Subdirectory: Include only immediate children + if ( + zip_info.filename.startswith(path) + and zip_info.filename != path + ): + relative_path = zip_info.filename[ + len(path) : + ].strip("/") + if "/" in relative_path: + # Subdirectory case + child_name = relative_path.split("/")[0] + if not any( + d["name"] == child_name + and d["type"] == "directory" + for d in directory_contents + ): + directory_contents.append( + { + "type": "directory", + "name": child_name, + } + ) + else: + # File case + directory_contents.append( + { + "type": "file", + "name": relative_path, + "size": zip_info.file_size, + "last_modified": datetime( + *zip_info.date_time + ).timestamp(), + } + ) + return JSONResponse( + status_code=200, + content=directory_contents, + ) + + # Otherwise, find the file inside the ZIP + try: + zip_info = zip_file.getinfo(path) + except KeyError: + return JSONResponse( + status_code=404, + content={ + "success": False, + "detail": f"File not found inside ZIP: {path}", + }, + ) + + # Get the byte range of the file in the ZIP + file_offset = zip_info.header_offset + len( + zip_info.FileHeader() + ) + file_length = zip_info.file_size + range_header = ( + f"bytes={file_offset}-{file_offset + file_length - 1}" + ) + + # Fetch the file content from S3 using the calculated byte range + response = await s3_client.get_object( + Bucket=self.workspace_bucket, + Key=s3_key, + Range=range_header, + ) + + # Stream the content back to the user + return StreamingResponse( + response["Body"], + media_type="application/octet-stream", + headers={ + "Content-Disposition": f'attachment; filename="{zip_info.filename}"' + }, + ) except KeyError: raise HTTPException(status_code=404, detail="Artifact not found") @@ -506,7 +785,7 @@ async def get_file( s3_config["endpoint_url"], s3_config["public_endpoint_url"], ) - return RedirectResponse(url=presigned_url, status_code=302) + return RedirectResponse(url=presigned_url, status_code=307) except KeyError: raise HTTPException(status_code=404, detail="Artifact not found") diff --git a/hypha/s3.py b/hypha/s3.py index 91631b67..8f352c8e 100644 --- a/hypha/s3.py +++ b/hypha/s3.py @@ -8,8 +8,6 @@ from datetime import datetime from email.utils import formatdate from typing import Any, Dict -import zipfile -from io import BytesIO import botocore from aiobotocore.session import get_session @@ -121,22 +119,6 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: await self.background() -async def fetch_zip_tail(s3_client, workspace_bucket, s3_key, content_length): - """ - Fetch the tail part of the zip file that contains the central directory. - This result is cached to avoid re-fetching the central directory multiple times. - """ - central_directory_offset = max(content_length - 65536, 0) - range_header = f"bytes={central_directory_offset}-{content_length}" - - # Fetch the last part of the ZIP file that contains the central directory - response = await s3_client.get_object( - Bucket=workspace_bucket, Key=s3_key, Range=range_header - ) - zip_tail = await response["Body"].read() - return zip_tail - - DEFAULT_CORS_POLICY = { "CORSRules": [ { @@ -261,113 +243,6 @@ async def upload_file( return await self._upload_file(path, request) - @router.get("/{workspace}/files/{zip_file_path:path}.zip/{path:path}") - async def get_zip_file_content( - workspace: str, - zip_file_path: str, - path: str, - user_info: store.login_optional = Depends(store.login_optional), - ) -> Response: - """ - Serve content from a zip file stored in S3 without fully unzipping it. - `zip_file_path` is the path to the zip file, `path` is the relative path inside the zip file. - """ - if not user_info.check_permission(workspace, UserPermission.read): - return JSONResponse( - status_code=403, - content={ - "success": False, - "detail": f"{user_info['username']} has no permission to access {workspace}/{path}", - }, - ) - - # Full key of the ZIP file in the S3 bucket - s3_key = f"{workspace}/{zip_file_path}.zip" - - try: - # S3 client setup - async with self.create_client_async() as s3_client: - # Fetch the ZIP file metadata from S3 (to avoid downloading the whole file) - try: - # Fetch the ZIP file metadata from S3 (to avoid downloading the whole file) - zip_file_metadata = await s3_client.head_object( - Bucket=self.workspace_bucket, Key=s3_key - ) - content_length = zip_file_metadata["ContentLength"] - except ClientError as e: - # Check if the error is due to the file not being found (404) - if e.response["Error"]["Code"] == "404": - return JSONResponse( - status_code=404, - content={ - "success": False, - "detail": f"ZIP file not found: {s3_key}", - }, - ) - else: - # For other types of errors, raise a 500 error - return JSONResponse( - status_code=500, - content={ - "success": False, - "detail": f"Failed to fetch file metadata: {str(e)}", - }, - ) - - # Fetch the ZIP's central directory from cache or download if not cached - cache_key = ( - f"zip_tail:{self.workspace_bucket}:{s3_key}:{content_length}" - ) - zip_tail = await cache.get(cache_key) - if zip_tail is None: - zip_tail = await fetch_zip_tail( - s3_client, self.workspace_bucket, s3_key, content_length - ) - await cache.set(cache_key, zip_tail, ttl=60) - - # Open the in-memory ZIP tail and parse it - with zipfile.ZipFile(BytesIO(zip_tail)) as zip_file: - # Find the file inside the ZIP - try: - zip_info = zip_file.getinfo(path) - except KeyError: - return JSONResponse( - status_code=404, - content={ - "success": False, - "detail": f"File not found inside ZIP: {path}", - }, - ) - - # Get the byte range of the file in the ZIP - file_offset = zip_info.header_offset + len( - zip_info.FileHeader() - ) - file_length = zip_info.file_size - range_header = ( - f"bytes={file_offset}-{file_offset + file_length - 1}" - ) - - # Fetch the file content from S3 using the calculated byte range - response = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=s3_key, Range=range_header - ) - - # Stream the content back to the user - return StreamingResponse( - response["Body"], - media_type="application/octet-stream", - headers={ - "Content-Disposition": f'attachment; filename="{zip_info.filename}"' - }, - ) - - except Exception as exp: - return JSONResponse( - status_code=500, - content={"success": False, "detail": str(exp)}, - ) - if self.enable_s3_proxy: class S3ProxyConfig(BaseURLProxyConfigMixin, ProxyConfig): diff --git a/tests/test_artifact.py b/tests/test_artifact.py index 64d15835..7cf03e6f 100644 --- a/tests/test_artifact.py +++ b/tests/test_artifact.py @@ -488,7 +488,7 @@ async def test_http_file_and_directory_endpoint( assert artifact["download_count"] == 2 # Attempt to list directory contents - async with httpx.AsyncClient(timeout=20) as client: + async with httpx.AsyncClient(timeout=200) as client: response = await client.get( f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/files/" ) @@ -496,9 +496,9 @@ async def test_http_file_and_directory_endpoint( assert "example.txt" in [file["name"] for file in response.json()] # Get the zip file with specific files - async with httpx.AsyncClient(timeout=20) as client: + async with httpx.AsyncClient(timeout=200) as client: response = await client.get( - f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files?file=example.txt&file={nested_file_path}" + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/create-zip-file?file=example.txt&file={nested_file_path}" ) assert response.status_code == 200 # Write the zip file in a io.BytesIO object, then check if the file contents are correct @@ -510,9 +510,9 @@ async def test_http_file_and_directory_endpoint( assert zip_file.read("nested/example2.txt").decode() == file_contents2 # Get the zip file with all files - async with httpx.AsyncClient(timeout=20) as client: + async with httpx.AsyncClient(timeout=200) as client: response = await client.get( - f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files" + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/create-zip-file" ) assert response.status_code == 200, response.text zip_file = ZipFile(BytesIO(response.content)) @@ -522,6 +522,143 @@ async def test_http_file_and_directory_endpoint( assert zip_file.read("example.txt").decode() == "file contents of example.txt" assert zip_file.read("nested/example2.txt").decode() == file_contents2 + # check download count + artifact = await artifact_manager.read( + artifact_id=dataset.id, + ) + assert artifact["download_count"] == 8 + + +async def test_get_zip_file_content_endpoint( + minio_server, fastapi_server, test_user_token +): + """Test retrieving specific content and listing directories from a ZIP file stored in S3.""" + + # Connect and get the artifact manager service + api = await connect_to_server( + {"name": "test-client", "server_url": SERVER_URL, "token": test_user_token} + ) + artifact_manager = await api.get_service("public/artifact-manager") + + # Create a collection and retrieve the UUID + collection_manifest = { + "name": "test-collection", + "description": "A test collection", + } + collection = await artifact_manager.create( + type="collection", + manifest=collection_manifest, + config={"permissions": {"*": "r", "@": "r+"}}, + ) + + # Create a dataset within the collection + dataset_manifest = { + "name": "test-dataset", + "description": "A test dataset", + } + dataset = await artifact_manager.create( + type="dataset", + parent_id=collection.id, + manifest=dataset_manifest, + version="stage", + ) + + # Create a ZIP file in memory + zip_buffer = BytesIO() + with ZipFile(zip_buffer, "w") as zip_file: + zip_file.writestr("example.txt", "file contents of example.txt") + zip_file.writestr("nested/example2.txt", "file contents of nested/example2.txt") + zip_file.writestr( + "nested/subdir/example3.txt", "file contents of nested/subdir/example3.txt" + ) + zip_buffer.seek(0) + + # Upload the ZIP file to the artifact + zip_file_path = "test-files" + put_url = await artifact_manager.put_file( + artifact_id=dataset.id, + file_path=f"{zip_file_path}.zip", + download_weight=1, + ) + async with httpx.AsyncClient(timeout=30) as client: + response = await client.put(put_url, data=zip_buffer.read()) + assert response.status_code == 200 + + # Commit the dataset artifact + await artifact_manager.commit(artifact_id=dataset.id) + + # Test listing the root directory of the ZIP + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip" + ) + assert response.status_code == 200 + items = response.json() + assert find_item(items, "name", "example.txt") is not None + assert find_item(items, "name", "nested") is not None + + # Test listing the "nested/" directory + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=nested/" + ) + assert response.status_code == 200 + items = response.json() + assert find_item(items, "name", "example2.txt") is not None + assert find_item(items, "name", "subdir") is not None + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip/~/nested/" + ) + assert response.status_code == 200 + items = response.json() + assert find_item(items, "name", "example2.txt") is not None + assert find_item(items, "name", "subdir") is not None + + # Test listing the "nested/subdir/" directory + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=nested/subdir/" + ) + assert response.status_code == 200 + items = response.json() + assert find_item(items, "name", "example3.txt") is not None + + # Test retrieving `example.txt` from the ZIP file + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=example.txt" + ) + assert response.status_code == 200 + assert response.text == "file contents of example.txt" + + # Test retrieving `nested/example2.txt` from the ZIP file + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=nested/example2.txt" + ) + assert response.status_code == 200 + assert response.text == "file contents of nested/example2.txt" + + # Test retrieving a non-existent file + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=nonexistent.txt" + ) + assert response.status_code == 404 + assert response.json()["detail"] == "File not found inside ZIP: nonexistent.txt" + + # Test listing a non-existent directory + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get( + f"{SERVER_URL}/{api.config.workspace}/artifacts/{dataset.alias}/zip-files/{zip_file_path}.zip?path=nonexistent/" + ) + assert response.status_code == 200 + assert ( + response.json() == [] + ) # An empty list indicates an empty or non-existent directory. + async def test_artifact_permissions( minio_server, fastapi_server, test_user_token, test_user_token_2 diff --git a/tests/test_s3.py b/tests/test_s3.py index 614f6c57..84303757 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -81,85 +81,6 @@ async def test_s3_proxy(minio_server, fastapi_server, test_user_token): assert response.headers["access-control-allow-origin"] == "http://localhost" -async def test_zip_file_endpoint(minio_server, fastapi_server, test_user_token): - """Test fetching files from inside the uploaded ZIP.""" - api = await connect_to_server( - { - "name": "anonymous client", - "server_url": WS_SERVER_URL, - "token": test_user_token, - } - ) - workspace = api.config["workspace"] - token = await api.generate_token() - - # Generate credentials and get the s3controller - s3controller = await api.get_service("public/s3-storage") - - # Create an in-memory ZIP file - zip_buffer = io.BytesIO() - with zipfile.ZipFile(zip_buffer, "w") as zf: - zf.writestr("file1.txt", "This is the content of file 1.") - zf.writestr("file2.txt", "This is the content of file 2.") - zf.writestr( - "subdir/file3.txt", "This is the content of file 3 inside a subdirectory." - ) - - zip_buffer.seek(0) - - # Generate a presigned URL to upload the ZIP file - presigned_url = await s3controller.generate_presigned_url( - f"apps/test.zip", client_method="put_object" - ) - - # Use requests to upload the ZIP file to the presigned URL - response = requests.put( - presigned_url, - data=zip_buffer.read(), - headers={"Content-Type": "application/zip"}, - ) - - assert response.status_code == 200, "Failed to upload the ZIP file to S3." - - response = requests.get( - f"{SERVER_URL}/{workspace}/files/apps/test22.zip/file1.txt", - headers={"Authorization": f"Bearer {token}"}, - ) - - assert response.status_code == 404, "Should return 404 for non-existent file." - - # Test retrieving the first file inside the ZIP - response = requests.get( - f"{SERVER_URL}/{workspace}/files/apps/test.zip/file1.txt", - headers={"Authorization": f"Bearer {token}"}, - ) - assert response.status_code == 200 - assert response.text == "This is the content of file 1." - - # Test retrieving the second file inside the ZIP - response = requests.get( - f"{SERVER_URL}/{workspace}/files/apps/test.zip/file2.txt", - headers={"Authorization": f"Bearer {token}"}, - ) - assert response.status_code == 200 - assert response.text == "This is the content of file 2." - - # Test retrieving a file from inside a subdirectory - response = requests.get( - f"{SERVER_URL}/{workspace}/files/apps/test.zip/subdir/file3.txt", - headers={"Authorization": f"Bearer {token}"}, - ) - assert response.status_code == 200 - assert response.text == "This is the content of file 3 inside a subdirectory." - - # Test retrieving a non-existent file - response = requests.get( - f"{SERVER_URL}/{workspace}/files/apps/test.zip/non-existent.txt", - headers={"Authorization": f"Bearer {token}"}, - ) - assert response.status_code == 404 - - # pylint: disable=too-many-statements async def test_s3(minio_server, fastapi_server, test_user_token): """Test s3 service."""