Skip to content

Commit

Permalink
Add app loader
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Aug 29, 2024
1 parent becfc86 commit 364905c
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 69 deletions.
136 changes: 83 additions & 53 deletions bioimageio/engine/ray_app_loader.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Provide main entrypoint."""
import asyncio
import re
from ray import serve
import logging
import yaml
import os
import sys
from pathlib import Path
import urllib.request
from starlette.requests import Request
from starlette.responses import HTMLResponse

from hypha_rpc.utils import ObjectProxy
from hypha_rpc.sync import connect_to_server
Expand All @@ -17,99 +18,128 @@
logger.setLevel(logging.INFO)


def load_app(plugin_file):
def load_app(app_file, manifest):
"""Load app file."""
if os.path.isfile(plugin_file):
with open(plugin_file, "r", encoding="utf-8") as fil:
if os.path.isfile(app_file):
with open(app_file, "r", encoding="utf-8") as fil:
content = fil.read()
elif plugin_file.startswith("http"):
with urllib.request.urlopen(plugin_file) as response:
elif app_file.startswith("http"):
with urllib.request.urlopen(app_file) as response:
content = response.read().decode("utf-8")
# remove query string
plugin_file = plugin_file.split("?")[0]
app_file = app_file.split("?")[0]
else:
raise Exception(f"Invalid input app file path: {plugin_file}")
raise Exception(f"Invalid input app file path: {app_file}")

if plugin_file.endswith(".py"):
app_config = ObjectProxy()
if app_file.endswith(".py"):
app_info = ObjectProxy.fromDict(manifest)
import hypha_rpc
def export(app_class, config=None):
app_config.update(config or {})
app_config.app_class = app_class
def export(app_class):
# make sure app_class is a class, not an instance
if not isinstance(app_class, type):
raise RuntimeError("exported object must be a class")
app_info.app_class = app_class
hypha_rpc.api = ObjectProxy(export=export)
exec(content, globals()) # pylint: disable=exec-used
logger.info("Plugin executed")
return app_config
logger.info(f"App loaded: {app_info.name}")
# load manifest file if exists
return app_info
else:
raise RuntimeError(f"Invalid script file type ({plugin_file})")
raise RuntimeError(f"Invalid script file type ({app_file})")


def load_all_apps(work_dir):
ray_apps = {}
apps_dir = work_dir / "ray_apps"
for sub_dir in apps_dir.iterdir():
# check the subfolder for apps
# there should be a file named "manifest.yaml" in the subfolder
# if yes, load the app
# by parsing the manifest.yaml file first,
# find the entrypoint key with the file path
# set it to app_file
if sub_dir.is_dir():
manifest_file = sub_dir / "manifest.yaml"
if manifest_file.is_file():
with open(manifest_file, "r") as f:
manifest = yaml.safe_load(f)

# make sure the app_id is in lower case, no spaces, only underscores, letters, and numbers
pattern = r"^[a-z0-9_]*$"
assert re.match(pattern, manifest["id"]), "App ID must be in lower case, no spaces, only underscores, letters, and numbers"

assert manifest["runtime"] == "ray", "Only ray apps are supported"
app_file = sub_dir / manifest["entrypoint"]

if app_file.is_file() and app_file.suffix == ".py":
app_info = load_app(str(app_file), manifest)
ray_serve_config = manifest.get("ray_serve_config", {})
app_deployment = serve.deployment(name=app_info.id, **ray_serve_config)(app_info.app_class).bind()
manifest["app_bind"] = app_deployment
manifest["methods"] = [m for m in dir(app_info.app_class) if not m.startswith("_")]
ray_apps[app_info.id] = manifest

print("Loaded apps:", ray_apps.keys())

assert len(ray_apps) > 0, "No apps loaded"
return ray_apps

@serve.deployment
class HyphaApp:
def __init__(self, server_url, workspace, token, services):
class HyphaRayAppManager:
def __init__(self, server_url, workspace, token, ray_apps):
self.server_url = server_url
self._services = services
self._apps = ray_apps
self._hypha_server = connect_to_server({"server_url": server_url, "token": token, "workspace": workspace})
svc = {
"name": "Ray Functions",
"id": "ray-functions",
"name": "Hypha Ray Apps",
"id": "hypha-ray-apps",
"config": {
"visibility": "protected"
},
}

def create_service_function(name, service_handle):
def create_service_function(name, app_bind, method_name):
async def service_function(*args, **kwargs):
return await service_handle.translate.remote(*args, **kwargs)
method = getattr(app_bind, method_name)
return await method.remote(*args, **kwargs)
service_function.__name__ = name
return service_function

for service_name, service_bind in self._services.items():
svc[service_name] = create_service_function(service_name, service_bind)
for app_id, app_info in self._apps.items():
app_bind = app_info["app_bind"]
methods = app_info["methods"]
app_service = {}
for method in methods:
print(f"Registering method {method} for app {app_id}")
app_service[method] = create_service_function(method, app_bind, method)

svc[app_id] = app_service

info = self._hypha_server.register_service(svc, {"overwrite":True})
print("Hypha service info:", info)
print(f"Service URL: {self.server_url}/{workspace}/services/{info.id.split('/')[1]}")
self.info = info

async def __call__(self, request: Request):
redirect_url = f"{self.server_url}/{self.info.config.workspace}/services/{self.info.id.split('/')[1]}/translator?text=hello"
return HTMLResponse(
"""
<html>
<head>
<meta http-equiv="refresh" content="2; URL='{}'" />
</head>
<body>
<p>Redirecting to Hypha...</p>
<h1>Services:</h1>
<ul>
{}
</ul>
</body>
</html>
""".format(redirect_url, "\n".join([f"<li>{name}</li>" for name in self._services.keys()]))
)
# return a json object with the services
services = {}
for app_id, app_info in self._apps.items():
services[app_id] = app_info["methods"]
return services


current_dir = Path(os.path.dirname(os.path.realpath(__file__)))
ray_apps = load_all_apps(current_dir)

ray_apps = {}
apps_dir = current_dir / "ray_apps"
for app_file in apps_dir.iterdir():
if app_file.is_file() and app_file.suffix == ".py" and app_file.stem != "__init__":
load_app(str(app_file))
app_info = load_app(str(app_file))
app_deployment = serve.deployment(name=app_info.name)(app_info.app_class).bind()
ray_apps[app_info.name] = app_deployment

print("Loaded apps:", ray_apps.keys())
# Getting config from environment
server_url = os.environ.get("HYPHA_SERVER_URL")
workspace = os.environ.get("HYPHA_WORKSPACE")
token = os.environ.get("HYPHA_TOKEN")

assert server_url, "Server URL is not provided"

app = HyphaApp.bind(server_url, workspace, token, ray_apps)
app = HyphaRayAppManager.bind(server_url, workspace, token, ray_apps)


if __name__ == "__main__":
serve.start()
Expand Down
15 changes: 0 additions & 15 deletions bioimageio/engine/ray_apps/cellpose.py

This file was deleted.

17 changes: 17 additions & 0 deletions bioimageio/engine/ray_apps/cellpose/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from hypha_rpc import api

class CellposeModel:
def __init__(self):
# Load model
pass

def predict(self, image: str) -> str:
prediction = "prediction of cellpose model on image" + image
return prediction

def train(self, data: str, config: str) -> str:
training = "training cellpose model on data" + data + "with config" + config
return training


api.export(CellposeModel)
14 changes: 14 additions & 0 deletions bioimageio/engine/ray_apps/cellpose/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: Cellpose
id: cellpose
description: Cellpose is a generalist algorithm for cell and nucleus segmentation
runtime: ray
entrypoint: __init__.py
ray_serve_config:
ray_actor_options:
runtime_env:
pip:
- numpy
- torch
autoscaling_config:
min_replicas: 0
max_replicas: 2
13 changes: 13 additions & 0 deletions bioimageio/engine/ray_apps/translator/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: Translator
id: translator
description: A simple translator that translates text from English to French
runtime: ray
entrypoint: translator.py
ray_serve_config:
ray_actor_options:
runtime_env:
pip:
- transformers
autoscaling_config:
min_replicas: 0
max_replicas: 2
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ def translate(self, text: str) -> str:
return translation


api.export(Translator, {"name": "translator"})
api.export(Translator)

0 comments on commit 364905c

Please sign in to comment.