Skip to content

Commit

Permalink
Merge pull request #9 from bioimage-io/support-ray-apps
Browse files Browse the repository at this point in the history
Support ray apps
  • Loading branch information
oeway authored Sep 2, 2024
2 parents 1c94900 + a0152e8 commit f9dcee5
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 91 deletions.
99 changes: 93 additions & 6 deletions bioimageio/engine/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import sys
import secrets
import asyncio
import subprocess
import ray
from hypha_rpc import connect_to_server
from hypha_launcher.api import HyphaLauncher
from hypha_launcher.utils.log import get_logger

import fire

logger = get_logger()
Expand Down Expand Up @@ -35,15 +37,100 @@ async def start_server(host = "0.0.0.0", port = 9000, public_base_url = "", laun

def connect_server(server_url, login_required=False):
from engine import connect_server
server_url = server_url
loop = asyncio.get_event_loop()
loop.create_task(connect_server(server_url))
loop.run_forever()

def serve_ray_apps():
from ray import serve
from bioimageio.engine.ray_app_loader import app
serve.run(app)
async def _run_ray_server_apps(address, ready_timeout):
if not address:
address = os.environ.get("RAY_ADDRESS")
with ray.init(address=address) as client_context:
dashboard_url = f"http://{client_context.dashboard_url}"
logger.info(f"Dashboard URL: {dashboard_url}")
server_url = os.environ.get("HYPHA_SERVER_URL")
workspace = os.environ.get("HYPHA_WORKSPACE")
token = os.environ.get("HYPHA_TOKEN")
assert server_url, "HYPHA_SERVER_URL environment variable is not set"
health_check_url = f"{server_url}/{workspace}/services/ray-apps"
logger.info(f"Health check URL: {health_check_url}")
shutdown_command = "serve shutdown -y" + (f" --address={dashboard_url}")
server = await connect_to_server({"server_url": server_url, "token": token, "workspace": workspace})
serve_command = f"serve run bioimageio.engine.ray_app_manager:app" + (f" --address={address}" if address else "")

proc = None
from simpervisor import SupervisedProcess
import httpx

async def ready_function(_):
try:
async with httpx.AsyncClient() as client:
response = await client.get(health_check_url)
return response.status_code == 200
except Exception as e:
logger.warning(f"Error checking readiness: {str(e)}")
return False

async def serve_apps():
nonlocal proc
if proc:
if await proc.ready():
return f"Ray Apps Manager already started at {server_url}/{workspace}/services/{svc.id.split('/')[1]}"
if proc:
await proc.kill()
command = [c.strip() for c in serve_command.split() if c.strip()]
name = "ray-apps-manager"
proc = SupervisedProcess(
name,
*command,
env=os.environ.copy(),
always_restart=False,
ready_func=ready_function,
ready_timeout=ready_timeout,
log=logger,
)

try:
await proc.start()

is_ready = await proc.ready()

if not is_ready and proc and proc.running:
await proc.kill()
raise Exception(f"External services ({name}) failed to start")
except:
if logger:
logger.exception(f"External services ({name}) failed to start")
raise
return f"Ray Apps Manager started at {server_url}/{workspace}/services/{svc.id.split('/')[1]}"

async def shutdown():
nonlocal proc
if proc:
if proc.running:
await proc.kill()
proc = None
else:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, os.system, shutdown_command)

svc = await server.register_service({
"name": "Ray App Manager",
"id": "ray-app-manager",
"config": {
"visibility": "public",
"run_in_executor": True,
},
"serve": serve_apps,
"shutdown": shutdown
})

# Start apps serve
await serve_apps()

def serve_ray_apps(address: str=None, ready_timeout: int=120):
loop = asyncio.get_event_loop()
loop.create_task(_run_ray_server_apps(address, ready_timeout))
loop.run_forever()

if __name__ == '__main__':
fire.Fire({
Expand Down
130 changes: 49 additions & 81 deletions bioimageio/engine/ray_app_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Provide main entrypoint."""
import asyncio
"""Provide ray app manager."""
import re
from ray import serve
import logging
Expand All @@ -8,13 +7,12 @@
import sys
from pathlib import Path
import urllib.request
from starlette.requests import Request

from hypha_rpc.utils import ObjectProxy
from hypha_rpc.sync import connect_to_server


logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger("app_launcher")
logger = logging.getLogger("ray_app_manager")
logger.setLevel(logging.INFO)


Expand All @@ -34,110 +32,80 @@ def load_app(app_file, manifest):
if app_file.endswith(".py"):
app_info = ObjectProxy.fromDict(manifest)
import hypha_rpc

def export(app_class):
# make sure app_class is a class, not an instance
if not isinstance(app_class, type):
raise RuntimeError("exported object must be a class")
app_info.app_class = app_class
# expose the app class to the current module
globals()[app_class.__name__] = app_class

hypha_rpc.api = ObjectProxy(export=export)
exec(content, globals()) # pylint: disable=exec-used
exec(content, globals())
logger.info(f"App loaded: {app_info.name}")
# load manifest file if exists
return app_info
else:
raise RuntimeError(f"Invalid script file type ({app_file})")


def load_all_apps(work_dir):
def create_ray_serve_config(app_info):
ray_serve_config = app_info.get(
"ray_serve_config", {"ray_actor_options": {"runtime_env": {}}}
)
assert (
"ray_actor_options" in ray_serve_config
), "ray_actor_options must be provided in ray_serve_config"
assert (
"runtime_env" in ray_serve_config["ray_actor_options"]
), "runtime_env must be provided in ray_actor_options"
runtime_env = ray_serve_config["ray_actor_options"]["runtime_env"]
if not runtime_env.get("pip"):
runtime_env["pip"] = ["hypha-rpc"]
else:
if "hypha-rpc" not in runtime_env["pip"]:
runtime_env["pip"].append("hypha-rpc")
runtime_env["pip"].append(
"https://github.com/bioimage-io/bioengine/archive/refs/heads/main.zip"
)
return ray_serve_config

def load_all_apps() -> dict:
current_dir = Path(os.path.dirname(os.path.realpath(__file__)))
apps_dir = current_dir / "ray_apps"
ray_apps = {}
apps_dir = work_dir / "ray_apps"
for sub_dir in apps_dir.iterdir():
# check the subfolder for apps
# there should be a file named "manifest.yaml" in the subfolder
# if yes, load the app
# by parsing the manifest.yaml file first,
# find the entrypoint key with the file path
# set it to app_file
if sub_dir.is_dir():
manifest_file = sub_dir / "manifest.yaml"
if manifest_file.is_file():
with open(manifest_file, "r") as f:
manifest = yaml.safe_load(f)

# make sure the app_id is in lower case, no spaces, only underscores, letters, and numbers
pattern = r"^[a-z0-9_]*$"
assert re.match(pattern, manifest["id"]), "App ID must be in lower case, no spaces, only underscores, letters, and numbers"
assert re.match(
pattern, manifest["id"]
), "App ID must be in lower case, no spaces, only underscores, letters, and numbers"

assert manifest["runtime"] == "ray", "Only ray apps are supported"
app_file = sub_dir / manifest["entrypoint"]

app_info = load_app(str(app_file), manifest)
ray_serve_config = create_ray_serve_config(app_info)
# runtime_env["env_vars"] = dict(os.environ)
app_deployment = serve.deployment(
name=app_info.id, **ray_serve_config
)(app_info.app_class).bind()
app_info.app_bind = app_deployment
app_info.methods = [
m for m in dir(app_info.app_class) if not m.startswith("_")
]
ray_apps[app_info.id] = app_info

if app_file.is_file() and app_file.suffix == ".py":
app_info = load_app(str(app_file), manifest)
ray_serve_config = manifest.get("ray_serve_config", {})
app_deployment = serve.deployment(name=app_info.id, **ray_serve_config)(app_info.app_class).bind()
manifest["app_bind"] = app_deployment
manifest["methods"] = [m for m in dir(app_info.app_class) if not m.startswith("_")]
ray_apps[app_info.id] = manifest

print("Loaded apps:", ray_apps.keys())

assert len(ray_apps) > 0, "No apps loaded"
return ray_apps

@serve.deployment
class HyphaRayAppManager:
def __init__(self, server_url, workspace, token, ray_apps):
self.server_url = server_url
self._apps = ray_apps
self._hypha_server = connect_to_server({"server_url": server_url, "token": token, "workspace": workspace})

def create_service_function(name, app_bind, method_name):
async def service_function(*args, **kwargs):
method = getattr(app_bind, method_name)
return await method.remote(*args, **kwargs)
service_function.__name__ = name
return service_function

for app_id, app_info in self._apps.items():
app_bind = app_info["app_bind"]
methods = app_info["methods"]
app_service = {
"id": app_id,
"name": app_info["name"],
"description": app_info["description"],
"config":{
"visibility": "protected"
},
}
for method in methods:
print(f"Registering method {method} for app {app_id}")
app_service[method] = create_service_function(method, app_bind, method)
info = self._hypha_server.register_service(app_service, {"overwrite":True})
print(f"Added service {app_id} with id {info.id}, use it at {self.server_url}/{workspace}/services/{info.id.split('/')[1]}")

async def __call__(self, request: Request):
# return a json object with the services
services = {}
for app_id, app_info in self._apps.items():
services[app_id] = app_info["methods"]
return services


current_dir = Path(os.path.dirname(os.path.realpath(__file__)))
ray_apps = load_all_apps(current_dir)

# Getting config from environment
server_url = os.environ.get("HYPHA_SERVER_URL")
workspace = os.environ.get("HYPHA_WORKSPACE")
token = os.environ.get("HYPHA_TOKEN")

assert server_url, "Server URL is not provided"

app = HyphaRayAppManager.bind(server_url, workspace, token, ray_apps)


if __name__ == "__main__":
serve.start()
serve.run(app, name="bioengine-apps")
import asyncio
asyncio.get_event_loop().run_forever()
ray_apps = load_all_apps()
Loading

0 comments on commit f9dcee5

Please sign in to comment.