diff --git a/__init__.py b/__init__.py index 88196055..eeab0941 100644 --- a/__init__.py +++ b/__init__.py @@ -37,17 +37,7 @@ def update_mappings(module): update_mappings(mzkolors) update_mappings(segment_anything) -# bizy_server -bizyair_adv_is_not_installed = False try: - import crcmod - import oss2 -except ImportError: - bizyair_adv_is_not_installed = True - print( - "\n\n\033[91m[BizyAir]\033[0m Please run" - " 'pip install -r requirements.txt' to install depencies for model host feature.\n\n" - ) - -if not bizyair_adv_is_not_installed: import bizy_server +except Exception as e: + print("\n\n\033[91m[BizyAir]\033[0m Fail to import 'bizy_server':" f" {e}\n\n") diff --git a/requirements.txt b/requirements.txt index 0270ed14..f2293605 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1 @@ -oss2 -crcmod requests -aiofiles -aliyun-python-sdk-core -aliyun-python-sdk-kms \ No newline at end of file diff --git a/src/bizy_server/oss.py b/src/bizy_server/oss.py deleted file mode 100644 index f74825b8..00000000 --- a/src/bizy_server/oss.py +++ /dev/null @@ -1,210 +0,0 @@ -import asyncio -import ctypes -import io -import logging -import os -import threading - -import oss2 -from oss2.models import PartInfo -from tqdm import tqdm - -logging.basicConfig(level=logging.DEBUG) - - -class AliOssStorageClient: - def __init__( - self, - endpoint, - bucket_name, - access_key, - secret_key, - security_token, - onUploading, - onInterrupted, - ): - auth = ( - oss2.StsAuth(access_key, secret_key, security_token) - if security_token - else oss2.Auth(access_key, secret_key) - ) - self.bucket = oss2.Bucket(auth, endpoint, bucket_name) - self.bucket_name = bucket_name - self.region = endpoint - self.onUploading = onUploading - self.onInterrupted = onInterrupted - self.interrupt_flag = False - self.upload_thread = None - logging.debug( - f"New OSS storage client initialized: {self.bucket_name} in {self.region}" - ) - - def _upload_file_with_interrupt(self, file_path, object_name, progress_callback): - try: - self.bucket.put_object_from_file( - object_name, file_path, progress_callback=progress_callback - ) - except oss2.exceptions.OssError as e: - logging.error(f"Failed to upload file: {e}") - raise e - - def sync_upload_file(self, file_path, object_name): - total_size = os.path.getsize(file_path) - progress_bar = tqdm( - total=total_size, - unit="B", - unit_scale=True, - desc=f"\033[94m[BizyAir]\033[0m Uploading {os.path.basename(file_path)}", - ) - # 维护累计发送的字节数 - bytes_uploaded = 0 - - def progress_callback(bytes_sent, total_bytes): - nonlocal bytes_uploaded - progress_increment = bytes_sent - bytes_uploaded - progress_bar.update(progress_increment) - bytes_uploaded = bytes_sent # 更新累计已发送的字节数 - if self.onUploading: - self.onUploading(bytes_sent, total_bytes) - - try: - self.bucket.put_object_from_file( - object_name, file_path, progress_callback=progress_callback - ) - except oss2.exceptions.OssError as e: - logging.error(f"\033[31m[BizyAir]\033[0m Failed to upload file: {e}") - raise e - finally: - progress_bar.close() - - return f"{self.bucket_name}/{self.region}/{object_name}" - - async def upload_file(self, file_path, object_name): - total_size = os.path.getsize(file_path) - progress_bar = tqdm( - total=total_size, - unit="B", - unit_scale=True, - desc=f"\033[94m[BizyAir]\033[0m Uploading {os.path.basename(file_path)}", - ) - - # 维护累计发送的字节数 - bytes_uploaded = 0 - - def progress_callback(bytes_sent, total_bytes): - nonlocal bytes_uploaded - progress_increment = bytes_sent - bytes_uploaded - progress_bar.update(progress_increment) - bytes_uploaded = bytes_sent # 更新累计已发送的字节数 - if self.onUploading: - self.onUploading(bytes_sent, total_bytes) - - self.upload_thread = threading.Thread( - target=self._upload_file_with_interrupt, - args=(file_path, object_name, progress_callback), - ) - self.upload_thread.start() - - while self.upload_thread.is_alive(): - await asyncio.sleep(0.1) - if self.interrupt_flag: - self.interrupt() - if self.onInterrupted: - self.onInterrupted() - break - - self.upload_thread.join() - progress_bar.close() - - return f"{self.bucket_name}/{self.region}/{object_name}" - - async def upload_file_content(self, file_content, file_name, object_name): - total_size = len(file_content) - progress_bar = tqdm( - total=total_size, - unit="B", - unit_scale=True, - desc=f"\033[94m[BizyAir]\033[0m Uploading {file_name}", - ) - - # 维护累计发送的字节数 - bytes_uploaded = 0 - - def progress_callback(bytes_sent, total_bytes): - nonlocal bytes_uploaded - progress_increment = bytes_sent - bytes_uploaded - progress_bar.update(progress_increment) - bytes_uploaded = bytes_sent # 更新累计已发送的字节数 - if self.onUploading: - self.onUploading(bytes_sent, total_bytes) - - try: - loop = asyncio.get_running_loop() - await loop.run_in_executor( - None, - self.bucket.put_object, - object_name, - io.BytesIO(file_content), - None, - progress_callback, - ) - except oss2.exceptions.OssError as e: - logging.error(f"Failed to upload file content: {e}") - raise e - finally: - progress_bar.close() - - return f"{self.bucket_name}/{self.region}/{object_name}" - - async def multipart_upload(self, file_path, object_name): - total_size = os.path.getsize(file_path) - part_size = oss2.determine_part_size(total_size, preferred_size=1024 * 1024) - upload_id = self.bucket.init_multipart_upload(object_name).upload_id - - parts = [] - loop = asyncio.get_running_loop() - with open(file_path, "rb") as f: - for part_number in range(1, (total_size + part_size - 1) // part_size + 1): - offset = (part_number - 1) * part_size - size = min(part_size, total_size - offset) - result = await loop.run_in_executor( - None, - self.bucket.upload_part, - object_name, - upload_id, - part_number, - f.read(size), - ) - parts.append(PartInfo(part_number, result.etag)) - logging.debug(f"Uploaded part {part_number} for {object_name}") - - try: - await loop.run_in_executor( - None, - self.bucket.complete_multipart_upload, - object_name, - upload_id, - parts, - ) - except oss2.exceptions.OssError as e: - logging.error(f"Failed to complete multipart upload: {e}") - raise e - - return f"{self.bucket_name}/{self.region}/{object_name}" - - def interrupt(self): - if self.upload_thread: - exc = ctypes.py_object(SystemExit) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(self.upload_thread.ident), exc - ) - if res == 0: - raise ValueError("Invalid thread ID") - elif res > 1: - ctypes.pythonapi.PyThreadState_SetAsyncExc( - self.upload_thread.ident, None - ) - raise SystemError("PyThreadState_SetAsyncExc failed") - - def interruptUploading(self): - self.interrupt_flag = True diff --git a/src/bizy_server/server.py b/src/bizy_server/server.py index 7cbe407b..396766fa 100644 --- a/src/bizy_server/server.py +++ b/src/bizy_server/server.py @@ -11,7 +11,6 @@ from .api_client import APIClient from .errno import ErrorNo, errnos from .error_handler import ErrorHandler -from .oss import AliOssStorageClient from .resp import ErrResponse, OKResponse from .utils import base_model_types, check_str_param, check_type, is_string_valid, types @@ -114,7 +113,7 @@ async def commit_file(request): commit_data, err = await self.api_client.commit_file( signature=sha256sum, object_key=object_key, md5_hash=md5_hash ) - print("commit_data", commit_data) + # print("commit_data", commit_data) if err is not None: return ErrResponse(err) @@ -174,7 +173,7 @@ async def commit_bizy_model(request): if err: return ErrResponse(err) - print("resp------------------------------->", json_data, resp) + # print("resp------------------------------->", json_data, resp) # 开启线程检查同步状态 threading.Thread( target=self.check_sync_status, @@ -380,60 +379,6 @@ async def like_model_version(request): ) return ErrResponse(errnos.TOGGLE_USER_LIKE) - @self.prompt_server.routes.post(f"/{COMMUNITY_API}/files/upload") - async def upload_file(request): - try: - # 获取上传的文件 - reader = await request.multipart() - field = await reader.next() - if not field or field.name != "file": - return ErrResponse(errnos.NO_FILE_UPLOAD) - - filename = field.filename - if not filename: - return ErrResponse(errnos.NO_FILE_UPLOAD) - - # 读取文件内容 - file_content = await field.read(decode=False) - - filename = urllib.parse.quote(filename) - # 获取上传凭证 - ret, err = await self.api_client.get_upload_token(filename=filename) - if err: - return ErrResponse(err) - - # 解析返回的凭证信息 - file_info = ret["file"] - storage_info = ret["storage"] - - def updateProgress(consume_bytes, total_bytes): - # do nothing - pass - - oss_client = AliOssStorageClient( - endpoint=storage_info.get("endpoint"), - bucket_name=storage_info.get("bucket"), - access_key=file_info.get("access_key_id"), - secret_key=file_info.get("access_key_secret"), - security_token=file_info.get("security_token"), - onUploading=updateProgress, - onInterrupted=None, - ) - await oss_client.upload_file_content( - file_content=file_content, - file_name=filename, - object_name=file_info.get("object_key"), - ) - return OKResponse( - { - "url": f'https://{storage_info.get("bucket")}.{storage_info.get("endpoint")}/{file_info.get("object_key")}' - } - ) - - except Exception as e: - print(f"\033[31m[BizyAir]\033[0m Fail to upload file: {str(e)}") - return ErrResponse(errnos.UPLOAD) - @self.prompt_server.routes.get( f"/{COMMUNITY_API}/models/versions/{{model_version_id}}/workflow_json/{{sign}}" )