From 639035337d87ea36d5c80ff77b4895943a5a6bf9 Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Mon, 25 Nov 2024 21:17:57 -0800 Subject: [PATCH 1/6] small fix --- hypha/VERSION | 2 +- hypha/core/__init__.py | 74 +++++++++++++++++++++++++----------------- hypha/core/store.py | 6 ++++ hypha/websocket.py | 25 ++++++++++---- pytest.ini | 5 ++- 5 files changed, 75 insertions(+), 37 deletions(-) diff --git a/hypha/VERSION b/hypha/VERSION index 99b962fd..0a3d03b9 100644 --- a/hypha/VERSION +++ b/hypha/VERSION @@ -1,3 +1,3 @@ { - "version": "0.20.39.post20" + "version": "0.20.40" } diff --git a/hypha/core/__init__.py b/hypha/core/__init__.py index 50757292..f76b335f 100644 --- a/hypha/core/__init__.py +++ b/hypha/core/__init__.py @@ -5,6 +5,7 @@ import json import logging import sys +import os from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Union from pydantic import BaseModel, Field, field_validator @@ -547,7 +548,11 @@ async def init(self): loop = asyncio.get_running_loop() self._ready = loop.create_future() self._loop = loop + + # Start the Redis subscription task loop.create_task(self._subscribe_redis()) + + # Wait for readiness signal await self._ready def on(self, event_name, func): @@ -652,39 +657,50 @@ def stop(self): self._stop = True async def _subscribe_redis(self): + cpu_count = os.cpu_count() or 1 + concurrent_tasks = cpu_count * 10 pubsub = self._redis.pubsub() self._stop = False - try: - await pubsub.psubscribe("event:*") - self._ready.set_result(True) - while self._stop is False: - msg = await pubsub.get_message(ignore_subscribe_messages=True) + semaphore = asyncio.Semaphore(concurrent_tasks) # Limit concurrent tasks + + async def process_message(msg): + """Process a single message while respecting the semaphore.""" + async with semaphore: # Acquire semaphore try: - if msg: - channel = msg["channel"].decode("utf-8") - RedisEventBus._counter.labels(event="*").inc() - if channel.startswith("event:b:"): - event_type = channel[8:] - data = msg["data"] - await self._redis_event_bus.emit(event_type, data) - if ":" not in event_type: - RedisEventBus._counter.labels(event=event_type).inc() - elif channel.startswith("event:d:"): - event_type = channel[8:] - data = json.loads(msg["data"]) - await self._redis_event_bus.emit(event_type, data) - if ":" not in event_type: - RedisEventBus._counter.labels(event=event_type).inc() - elif channel.startswith("event:s:"): - event_type = channel[8:] - data = msg["data"].decode("utf-8") - await self._redis_event_bus.emit(event_type, data) - if ":" not in event_type: - RedisEventBus._counter.labels(event=event_type).inc() - else: - logger.info("Unknown channel: %s", channel) + channel = msg["channel"].decode("utf-8") + RedisEventBus._counter.labels(event="*").inc() + + if channel.startswith("event:b:"): + event_type = channel[8:] + data = msg["data"] + await self._redis_event_bus.emit(event_type, data) + if ":" not in event_type: + RedisEventBus._counter.labels(event=event_type).inc() + elif channel.startswith("event:d:"): + event_type = channel[8:] + data = json.loads(msg["data"]) + await self._redis_event_bus.emit(event_type, data) + if ":" not in event_type: + RedisEventBus._counter.labels(event=event_type).inc() + elif channel.startswith("event:s:"): + event_type = channel[8:] + data = msg["data"].decode("utf-8") + await self._redis_event_bus.emit(event_type, data) + if ":" not in event_type: + RedisEventBus._counter.labels(event=event_type).inc() + else: + logger.info("Unknown channel: %s", channel) except Exception as exp: logger.exception(f"Error processing message: {exp}") - await asyncio.sleep(0) + + try: + await pubsub.psubscribe("event:*") + self._ready.set_result(True) + while not self._stop: + msg = await pubsub.get_message( + ignore_subscribe_messages=True, timeout=0.1 + ) + if msg: + asyncio.create_task(process_message(msg)) # Add task to pool except Exception as exp: self._ready.set_exception(exp) diff --git a/hypha/core/store.py b/hypha/core/store.py index 1f2f3c65..c4352442 100644 --- a/hypha/core/store.py +++ b/hypha/core/store.py @@ -907,4 +907,10 @@ async def teardown(self): await self.remove_client( client_id, self._root_user.get_workspace(), self._root_user, unload=True ) + websockets = self._websocket_server.get_websockets() + for ws in websockets.values(): + try: + await ws.close() + except GeneratorExit: + pass logger.info("Teardown complete") diff --git a/hypha/websocket.py b/hypha/websocket.py index b69a204a..aef22fb8 100644 --- a/hypha/websocket.py +++ b/hypha/websocket.py @@ -66,10 +66,13 @@ async def websocket_endpoint( return else: logger.warning("Rejecting legacy imjoy-rpc client (%s)", client_id) - await websocket.close( - code=status.WS_1008_POLICY_VIOLATION, - reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message", - ) + try: + await websocket.close( + code=status.WS_1008_POLICY_VIOLATION, + reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message", + ) + except GeneratorExit: + pass return try: @@ -164,11 +167,18 @@ async def websocket_endpoint( e, ) + def get_websockets(self): + """Get the active websockets.""" + return self._websockets + async def force_disconnect(self, workspace, client_id, code, reason): """Force disconnect a client.""" assert f"{workspace}/{client_id}" in self._websockets, "Client not connected" websocket = self._websockets[f"{workspace}/{client_id}"] - await websocket.close(code=code, reason=reason) + try: + await websocket.close(code=code, reason=reason) + except GeneratorExit: + pass # Suppress GeneratorExit to avoid RuntimeError async def check_client(self, client_id, workspace, user_info): """Check if the client is already connected.""" @@ -367,7 +377,10 @@ async def disconnect(self, websocket, reason, code=status.WS_1000_NORMAL_CLOSURE await websocket.send_text(json.dumps({"type": "error", "message": reason})) try: if websocket.client_state.name not in ["CLOSED", "CLOSING", "DISCONNECTED"]: - await websocket.close(code=code, reason=reason) + try: + await websocket.close(code=code, reason=reason) + except GeneratorExit: + pass # Suppress GeneratorExit to avoid RuntimeError except Exception as e: logger.error(f"Error disconnecting websocket: {str(e)}") diff --git a/pytest.ini b/pytest.ini index d0f9fcc3..c2b8faf1 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,5 @@ [pytest] -asyncio_mode = strict \ No newline at end of file +asyncio_mode = strict +filterwarnings = + ignore::DeprecationWarning + ignore::UserWarning \ No newline at end of file From 9b2dc5fe121ec777cb91c010043de189f26c9924 Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Mon, 25 Nov 2024 21:35:15 -0800 Subject: [PATCH 2/6] Improve the speed --- CHANGELOG.md | 6 ++++++ docs/getting-started.md | 2 +- docs/migration-guide.md | 10 +++++----- docs/service-type-annotation.md | 2 +- helm-charts/aks-hypha.md | 2 +- helm-charts/hypha-server/Chart.yaml | 2 +- helm-charts/hypha-server/values.yaml | 2 +- hypha/core/__init__.py | 2 +- hypha/templates/hypha-core-app/hypha-app-webpython.js | 2 +- hypha/templates/ws/index.html | 2 +- requirements.txt | 2 +- setup.py | 2 +- 12 files changed, 21 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c4c6669..494ca152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Hypha Change Log +### 0.20.40 + + - Add vector store service to support vector search and retrieval. + - Fix zenodo file upload issue + - Speed up server by removing the `asyncio.sleep(0.01)` throttling and support concurrent handling of events in the redis event bus. + ### 0.20.39 - Revise artifact manager to use artifact id as the primary key, remove `prefix` based keys. diff --git a/docs/getting-started.md b/docs/getting-started.md index eb1fdaed..6db5031c 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -220,7 +220,7 @@ svc = await get_remote_service("http://localhost:9527/ws-user-scintillating-lawy Include the following script in your HTML file to load the `hypha-rpc` client: ```html - + ``` Use the following code in JavaScript to connect to the server and access an existing service: diff --git a/docs/migration-guide.md b/docs/migration-guide.md index 7ad14308..5be913e4 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -15,7 +15,7 @@ To connect to the server, instead of installing the `imjoy-rpc` module, you will pip install -U hypha-rpc # new install ``` -We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`. +We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`. #### 2. Change the imports to use `hypha-rpc` @@ -128,10 +128,10 @@ loop.run_forever() To connect to the server, instead of using the `imjoy-rpc` module, you will need to use the `hypha-rpc` module. The `hypha-rpc` module is a standalone module that provides the RPC connection to the Hypha server. You can include it in your HTML using a script tag: ```html - + ``` -We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`. +We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`. #### 2. Change the connection method and use camelCase for service function names @@ -149,7 +149,7 @@ Here is a suggested list of search and replace operations to update your code: Here is an example of how the updated code might look: ```html - + + +