diff --git a/bioimageio/engine/ray_app_loader.py b/bioimageio/engine/ray_app_loader.py index dc4af3d..b640790 100644 --- a/bioimageio/engine/ray_app_loader.py +++ b/bioimageio/engine/ray_app_loader.py @@ -1,13 +1,14 @@ """Provide main entrypoint.""" import asyncio +import re from ray import serve import logging +import yaml import os import sys from pathlib import Path import urllib.request from starlette.requests import Request -from starlette.responses import HTMLResponse from hypha_rpc.utils import ObjectProxy from hypha_rpc.sync import connect_to_server @@ -17,91 +18,119 @@ logger.setLevel(logging.INFO) -def load_app(plugin_file): +def load_app(app_file, manifest): """Load app file.""" - if os.path.isfile(plugin_file): - with open(plugin_file, "r", encoding="utf-8") as fil: + if os.path.isfile(app_file): + with open(app_file, "r", encoding="utf-8") as fil: content = fil.read() - elif plugin_file.startswith("http"): - with urllib.request.urlopen(plugin_file) as response: + elif app_file.startswith("http"): + with urllib.request.urlopen(app_file) as response: content = response.read().decode("utf-8") # remove query string - plugin_file = plugin_file.split("?")[0] + app_file = app_file.split("?")[0] else: - raise Exception(f"Invalid input app file path: {plugin_file}") + raise Exception(f"Invalid input app file path: {app_file}") - if plugin_file.endswith(".py"): - app_config = ObjectProxy() + if app_file.endswith(".py"): + app_info = ObjectProxy.fromDict(manifest) import hypha_rpc - def export(app_class, config=None): - app_config.update(config or {}) - app_config.app_class = app_class + def export(app_class): + # make sure app_class is a class, not an instance + if not isinstance(app_class, type): + raise RuntimeError("exported object must be a class") + app_info.app_class = app_class hypha_rpc.api = ObjectProxy(export=export) exec(content, globals()) # pylint: disable=exec-used - logger.info("Plugin executed") - return app_config + logger.info(f"App loaded: {app_info.name}") + # load manifest file if exists + return app_info else: - raise RuntimeError(f"Invalid script file type ({plugin_file})") + raise RuntimeError(f"Invalid script file type ({app_file})") + + +def load_all_apps(work_dir): + ray_apps = {} + apps_dir = work_dir / "ray_apps" + for sub_dir in apps_dir.iterdir(): + # check the subfolder for apps + # there should be a file named "manifest.yaml" in the subfolder + # if yes, load the app + # by parsing the manifest.yaml file first, + # find the entrypoint key with the file path + # set it to app_file + if sub_dir.is_dir(): + manifest_file = sub_dir / "manifest.yaml" + if manifest_file.is_file(): + with open(manifest_file, "r") as f: + manifest = yaml.safe_load(f) + + # make sure the app_id is in lower case, no spaces, only underscores, letters, and numbers + pattern = r"^[a-z0-9_]*$" + assert re.match(pattern, manifest["id"]), "App ID must be in lower case, no spaces, only underscores, letters, and numbers" + + assert manifest["runtime"] == "ray", "Only ray apps are supported" + app_file = sub_dir / manifest["entrypoint"] + + if app_file.is_file() and app_file.suffix == ".py": + app_info = load_app(str(app_file), manifest) + ray_serve_config = manifest.get("ray_serve_config", {}) + app_deployment = serve.deployment(name=app_info.id, **ray_serve_config)(app_info.app_class).bind() + manifest["app_bind"] = app_deployment + manifest["methods"] = [m for m in dir(app_info.app_class) if not m.startswith("_")] + ray_apps[app_info.id] = manifest + + print("Loaded apps:", ray_apps.keys()) + + assert len(ray_apps) > 0, "No apps loaded" + return ray_apps @serve.deployment -class HyphaApp: - def __init__(self, server_url, workspace, token, services): +class HyphaRayAppManager: + def __init__(self, server_url, workspace, token, ray_apps): self.server_url = server_url - self._services = services + self._apps = ray_apps self._hypha_server = connect_to_server({"server_url": server_url, "token": token, "workspace": workspace}) svc = { - "name": "Ray Functions", - "id": "ray-functions", + "name": "Hypha Ray Apps", + "id": "hypha-ray-apps", "config": { "visibility": "protected" }, } - def create_service_function(name, service_handle): + def create_service_function(name, app_bind, method_name): async def service_function(*args, **kwargs): - return await service_handle.translate.remote(*args, **kwargs) + method = getattr(app_bind, method_name) + return await method.remote(*args, **kwargs) service_function.__name__ = name return service_function - for service_name, service_bind in self._services.items(): - svc[service_name] = create_service_function(service_name, service_bind) + for app_id, app_info in self._apps.items(): + app_bind = app_info["app_bind"] + methods = app_info["methods"] + app_service = {} + for method in methods: + print(f"Registering method {method} for app {app_id}") + app_service[method] = create_service_function(method, app_bind, method) + + svc[app_id] = app_service info = self._hypha_server.register_service(svc, {"overwrite":True}) print("Hypha service info:", info) + print(f"Service URL: {self.server_url}/{workspace}/services/{info.id.split('/')[1]}") self.info = info async def __call__(self, request: Request): - redirect_url = f"{self.server_url}/{self.info.config.workspace}/services/{self.info.id.split('/')[1]}/translator?text=hello" - return HTMLResponse( - """ - - - - - -

Redirecting to Hypha...

-

Services:

- - - - """.format(redirect_url, "\n".join([f"
  • {name}
  • " for name in self._services.keys()])) - ) + # return a json object with the services + services = {} + for app_id, app_info in self._apps.items(): + services[app_id] = app_info["methods"] + return services current_dir = Path(os.path.dirname(os.path.realpath(__file__))) +ray_apps = load_all_apps(current_dir) -ray_apps = {} -apps_dir = current_dir / "ray_apps" -for app_file in apps_dir.iterdir(): - if app_file.is_file() and app_file.suffix == ".py" and app_file.stem != "__init__": - load_app(str(app_file)) - app_info = load_app(str(app_file)) - app_deployment = serve.deployment(name=app_info.name)(app_info.app_class).bind() - ray_apps[app_info.name] = app_deployment - -print("Loaded apps:", ray_apps.keys()) # Getting config from environment server_url = os.environ.get("HYPHA_SERVER_URL") workspace = os.environ.get("HYPHA_WORKSPACE") @@ -109,7 +138,8 @@ async def __call__(self, request: Request): assert server_url, "Server URL is not provided" -app = HyphaApp.bind(server_url, workspace, token, ray_apps) +app = HyphaRayAppManager.bind(server_url, workspace, token, ray_apps) + if __name__ == "__main__": serve.start() diff --git a/bioimageio/engine/ray_apps/cellpose.py b/bioimageio/engine/ray_apps/cellpose.py deleted file mode 100644 index e534c87..0000000 --- a/bioimageio/engine/ray_apps/cellpose.py +++ /dev/null @@ -1,15 +0,0 @@ -from hypha_rpc import api - -class CellposeModel: - def __init__(self): - # Load model - pass - - def predict(self, image: str) -> str: - pass - - def train(self, data: str, config: str) -> str: - pass - - -api.export(CellposeModel, {"name": "cellpose", "type": "ray"}) \ No newline at end of file diff --git a/bioimageio/engine/ray_apps/cellpose/__init__.py b/bioimageio/engine/ray_apps/cellpose/__init__.py new file mode 100644 index 0000000..5423dfe --- /dev/null +++ b/bioimageio/engine/ray_apps/cellpose/__init__.py @@ -0,0 +1,17 @@ +from hypha_rpc import api + +class CellposeModel: + def __init__(self): + # Load model + pass + + def predict(self, image: str) -> str: + prediction = "prediction of cellpose model on image" + image + return prediction + + def train(self, data: str, config: str) -> str: + training = "training cellpose model on data" + data + "with config" + config + return training + + +api.export(CellposeModel) diff --git a/bioimageio/engine/ray_apps/cellpose/manifest.yaml b/bioimageio/engine/ray_apps/cellpose/manifest.yaml new file mode 100644 index 0000000..5f1d394 --- /dev/null +++ b/bioimageio/engine/ray_apps/cellpose/manifest.yaml @@ -0,0 +1,14 @@ +name: Cellpose +id: cellpose +description: Cellpose is a generalist algorithm for cell and nucleus segmentation +runtime: ray +entrypoint: __init__.py +ray_serve_config: + ray_actor_options: + runtime_env: + pip: + - numpy + - torch + autoscaling_config: + min_replicas: 0 + max_replicas: 2 \ No newline at end of file diff --git a/bioimageio/engine/ray_apps/translator/manifest.yaml b/bioimageio/engine/ray_apps/translator/manifest.yaml new file mode 100644 index 0000000..cb83c73 --- /dev/null +++ b/bioimageio/engine/ray_apps/translator/manifest.yaml @@ -0,0 +1,13 @@ +name: Translator +id: translator +description: A simple translator that translates text from English to French +runtime: ray +entrypoint: translator.py +ray_serve_config: + ray_actor_options: + runtime_env: + pip: + - transformers + autoscaling_config: + min_replicas: 0 + max_replicas: 2 \ No newline at end of file diff --git a/bioimageio/engine/ray_apps/translator.py b/bioimageio/engine/ray_apps/translator/translator.py similarity index 90% rename from bioimageio/engine/ray_apps/translator.py rename to bioimageio/engine/ray_apps/translator/translator.py index 8664e1d..56d9397 100644 --- a/bioimageio/engine/ray_apps/translator.py +++ b/bioimageio/engine/ray_apps/translator/translator.py @@ -17,4 +17,4 @@ def translate(self, text: str) -> str: return translation -api.export(Translator, {"name": "translator"}) \ No newline at end of file +api.export(Translator) \ No newline at end of file