Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve redis event bus #718

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Hypha Change Log

### 0.20.40

- Add vector store service to support vector search and retrieval.
- Fix zenodo file upload issue
- Speed up server by removing the `asyncio.sleep(0.01)` throttling and support concurrent handling of events in the redis event bus.

### 0.20.39

- Revise artifact manager to use artifact id as the primary key, remove `prefix` based keys.
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ svc = await get_remote_service("http://localhost:9527/ws-user-scintillating-lawy
Include the following script in your HTML file to load the `hypha-rpc` client:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
```

Use the following code in JavaScript to connect to the server and access an existing service:
Expand Down
10 changes: 5 additions & 5 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To connect to the server, instead of installing the `imjoy-rpc` module, you will
pip install -U hypha-rpc # new install
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`.

#### 2. Change the imports to use `hypha-rpc`

Expand Down Expand Up @@ -128,10 +128,10 @@ loop.run_forever()
To connect to the server, instead of using the `imjoy-rpc` module, you will need to use the `hypha-rpc` module. The `hypha-rpc` module is a standalone module that provides the RPC connection to the Hypha server. You can include it in your HTML using a script tag:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`.

#### 2. Change the connection method and use camelCase for service function names

Expand All @@ -149,7 +149,7 @@ Here is a suggested list of search and replace operations to update your code:
Here is an example of how the updated code might look:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main(){
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down Expand Up @@ -197,7 +197,7 @@ We created a tutorial to introduce this new feature: [service type annotation](.
Here is a quick example in JavaScript:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>

<script>
async function main(){
Expand Down
2 changes: 1 addition & 1 deletion docs/service-type-annotation.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ if __name__ == "__main__":
**JavaScript Client: Service Usage**

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main() {
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/aks-hypha.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ replicaCount: 1
image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
tag: "0.20.39"
tag: "0.20.40"

serviceAccount:
create: true
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.20.39
version: 0.20.40

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "0.20.39"
tag: "0.20.40"

imagePullSecrets: []
nameOverride: ""
Expand Down
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.20.39.post20"
"version": "0.20.40"
}
79 changes: 40 additions & 39 deletions hypha/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,46 +1798,47 @@ async def commit(

versions = artifact.versions or []
artifact.config = artifact.config or {}
if artifact.staging:
s3_config = self._get_s3_config(artifact, parent_artifact)
async with self._create_client_async(
s3_config,
) as s3_client:
download_weights = {}
for file_info in artifact.staging:
file_key = safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}/{file_info['path']}",
)
try:
await s3_client.head_object(
Bucket=s3_config["bucket"], Key=file_key
)
except ClientError:
raise FileNotFoundError(
f"File '{file_info['path']}' does not exist in the artifact."
)
if (
file_info.get("download_weight") is not None
and file_info["download_weight"] > 0
):
download_weights[file_info["path"]] = file_info[
"download_weight"
]
if download_weights:
artifact.config["download_weights"] = download_weights
flag_modified(artifact, "config")

artifact.file_count = await self._count_files_in_prefix(
s3_client,
s3_config["bucket"],
safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}",
),

s3_config = self._get_s3_config(artifact, parent_artifact)
async with self._create_client_async(
s3_config,
) as s3_client:
download_weights = {}
for file_info in artifact.staging or []:
file_key = safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}/{file_info['path']}",
)
try:
await s3_client.head_object(
Bucket=s3_config["bucket"], Key=file_key
)
except ClientError:
raise FileNotFoundError(
f"File '{file_info['path']}' does not exist in the artifact."
)
if (
file_info.get("download_weight") is not None
and file_info["download_weight"] > 0
):
download_weights[file_info["path"]] = file_info[
"download_weight"
]

if download_weights:
artifact.config["download_weights"] = download_weights
flag_modified(artifact, "config")

artifact.file_count = await self._count_files_in_prefix(
s3_client,
s3_config["bucket"],
safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}",
),
)

parent_artifact_config = (
parent_artifact.config if parent_artifact else {}
Expand Down
74 changes: 45 additions & 29 deletions hypha/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import sys
import os
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pydantic import BaseModel, Field, field_validator
Expand Down Expand Up @@ -547,7 +548,11 @@ async def init(self):
loop = asyncio.get_running_loop()
self._ready = loop.create_future()
self._loop = loop

# Start the Redis subscription task
loop.create_task(self._subscribe_redis())

# Wait for readiness signal
await self._ready

def on(self, event_name, func):
Expand Down Expand Up @@ -652,39 +657,50 @@ def stop(self):
self._stop = True

async def _subscribe_redis(self):
cpu_count = os.cpu_count() or 1
concurrent_tasks = cpu_count * 10
pubsub = self._redis.pubsub()
self._stop = False
try:
await pubsub.psubscribe("event:*")
self._ready.set_result(True)
while self._stop is False:
msg = await pubsub.get_message(ignore_subscribe_messages=True)
semaphore = asyncio.Semaphore(concurrent_tasks) # Limit concurrent tasks

async def process_message(msg):
"""Process a single message while respecting the semaphore."""
async with semaphore: # Acquire semaphore
try:
if msg:
channel = msg["channel"].decode("utf-8")
RedisEventBus._counter.labels(event="*").inc()
if channel.startswith("event:b:"):
event_type = channel[8:]
data = msg["data"]
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:d:"):
event_type = channel[8:]
data = json.loads(msg["data"])
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:s:"):
event_type = channel[8:]
data = msg["data"].decode("utf-8")
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
else:
logger.info("Unknown channel: %s", channel)
channel = msg["channel"].decode("utf-8")
RedisEventBus._counter.labels(event="*").inc()

if channel.startswith("event:b:"):
event_type = channel[8:]
data = msg["data"]
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:d:"):
event_type = channel[8:]
data = json.loads(msg["data"])
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:s:"):
event_type = channel[8:]
data = msg["data"].decode("utf-8")
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
else:
logger.info("Unknown channel: %s", channel)
except Exception as exp:
logger.exception(f"Error processing message: {exp}")
await asyncio.sleep(0)

try:
await pubsub.psubscribe("event:*")
self._ready.set_result(True)
while not self._stop:
msg = await pubsub.get_message(
ignore_subscribe_messages=True, timeout=0.05
)
if msg:
asyncio.create_task(process_message(msg)) # Add task to pool
except Exception as exp:
self._ready.set_exception(exp)
7 changes: 7 additions & 0 deletions hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,11 @@ async def teardown(self):
await self.remove_client(
client_id, self._root_user.get_workspace(), self._root_user, unload=True
)
if self._websocket_server:
websockets = self._websocket_server.get_websockets()
for ws in websockets.values():
try:
await ws.close()
except GeneratorExit:
pass
logger.info("Teardown complete")
2 changes: 1 addition & 1 deletion hypha/templates/hypha-core-app/hypha-app-webpython.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ loadPyodide().then(async (pyodide) => {
pyodide.setStderr({ batched: (msg) => console.error(msg) });
await pyodide.loadPackage("micropip");
const micropip = pyodide.pyimport("micropip");
await micropip.install('hypha-rpc==0.20.39');
await micropip.install('hypha-rpc==0.20.40');
const isWindow = typeof window !== "undefined";

setTimeout(() => {
Expand Down
4 changes: 2 additions & 2 deletions hypha/templates/ws/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
<body class="bg-black text-white font-poppins">
<div id="app"></div>
<script type="module">
import { HyphaCore } from "https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-core.mjs";
import { HyphaCore } from "https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-core.mjs";

const defaultService = {
getServerConfig(context){
Expand Down Expand Up @@ -765,7 +765,7 @@ <h2 className="text-2xl font-bold text-white">Artifact Details</h2>
<h3 className="text-lg font-bold">Files:</h3>
<ul className="list-disc list-inside text-gray-300 mt-2">
{files.map((file) => (
<li key={file.name} className="break-all flex items-center justify-between">
<li key={file.name} className="break-all flex mb-1 items-center justify-between">
<span><i className={`fas ${file.type==='directory' ? 'fa-folder' : 'fa-file-alt'} mr-2`}></i>{file.name}</span>
{file.type === "file" && (
<div className="flex space-x-2">
Expand Down
25 changes: 19 additions & 6 deletions hypha/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ async def websocket_endpoint(
return
else:
logger.warning("Rejecting legacy imjoy-rpc client (%s)", client_id)
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION,
reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message",
)
try:
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION,
reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message",
)
except GeneratorExit:
pass
return

try:
Expand Down Expand Up @@ -164,11 +167,18 @@ async def websocket_endpoint(
e,
)

def get_websockets(self):
"""Get the active websockets."""
return self._websockets

async def force_disconnect(self, workspace, client_id, code, reason):
"""Force disconnect a client."""
assert f"{workspace}/{client_id}" in self._websockets, "Client not connected"
websocket = self._websockets[f"{workspace}/{client_id}"]
await websocket.close(code=code, reason=reason)
try:
await websocket.close(code=code, reason=reason)
except GeneratorExit:
pass # Suppress GeneratorExit to avoid RuntimeError

async def check_client(self, client_id, workspace, user_info):
"""Check if the client is already connected."""
Expand Down Expand Up @@ -367,7 +377,10 @@ async def disconnect(self, websocket, reason, code=status.WS_1000_NORMAL_CLOSURE
await websocket.send_text(json.dumps({"type": "error", "message": reason}))
try:
if websocket.client_state.name not in ["CLOSED", "CLOSING", "DISCONNECTED"]:
await websocket.close(code=code, reason=reason)
try:
await websocket.close(code=code, reason=reason)
except GeneratorExit:
pass # Suppress GeneratorExit to avoid RuntimeError
except Exception as e:
logger.error(f"Error disconnecting websocket: {str(e)}")

Expand Down
5 changes: 4 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[pytest]
asyncio_mode = strict
asyncio_mode = strict
filterwarnings =
ignore::DeprecationWarning
ignore::UserWarning
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aiofiles==23.2.1
websockets==13.1
base58==2.1.1
fastapi==0.106.0
hypha-rpc==0.20.39
hypha-rpc==0.20.40
jinja2==3.1.4
lxml==4.9.3
msgpack==1.0.8
Expand Down
Loading
Loading