diff --git a/clearml/router/__init__.py b/clearml/router/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/clearml/router/endpoint_telemetry.py b/clearml/router/endpoint_telemetry.py new file mode 100644 index 00000000..b82d6720 --- /dev/null +++ b/clearml/router/endpoint_telemetry.py @@ -0,0 +1,235 @@ +from ..task import Task +from ..utilities.resource_monitor import ResourceMonitor +import uuid +import time +import copy +from threading import Thread + + +class EndpointTelemetry: + BACKEND_STAT_MAP = { + "cpu_usage_*": "cpu_usage", + "cpu_temperature_*": "cpu_temperature", + "disk_free_percent": "disk_free_home", + "io_read_mbs": "disk_read", + "io_write_mbs": "disk_write", + "network_tx_mbs": "network_tx", + "network_rx_mbs": "network_rx", + "memory_free_gb": "memory_free", + "memory_used_gb": "memory_used", + "gpu_temperature_*": "gpu_temperature", + "gpu_mem_used_gb_*": "gpu_memory_used", + "gpu_mem_free_gb_*": "gpu_memory_free", + "gpu_utilization_*": "gpu_usage", + } + + def __init__( + self, + endpoint_name="endpoint", + model_name="model", + model=None, + model_url=None, + model_source=None, + model_version=None, + app_id=None, + app_instance=None, + tags=None, + system_tags=None, + container_id=None, + input_size=None, + input_type="str", + report_statistics=True, + endpoint_url=None, + preprocess_artifact=None, + force_register=False + ): + self.report_window = 30 + self._previous_readouts = {} + self._previous_readouts_ts = time.time() + self._num_readouts = 0 + self.container_info = { + "container_id": container_id or str(uuid.uuid4()).replace("-", ""), + "endpoint_name": endpoint_name, + "model_name": model_name, + "model_source": model_source, + "model_version": model_version, + "preprocess_artifact": preprocess_artifact, + "input_type": str(input_type), + "input_size": str(input_size), + "tags": tags, + "system_tags": system_tags, + "endpoint_url": endpoint_url + } + references = [] + if app_id: + references.append({"type": "app_id", "value": app_id}) + if app_instance: + references.append({"type": "app_instance", "value": app_instance}) + references.append({"type": "task", "value": Task.current_task().id}) + if model: + references.append({"type": "model", "value": model}) + if model_url: + references.append({"type": "url", "value": model_url}) + self.container_info["reference"] = references + self.session = Task._get_default_session() + self.requests_num = 0 + self.requests_num_window = 0 + self.requests_num_prev_window = 0 + self.latency_sum_window = 0 + self.uptime_timestamp = time.time() + self.last_request_time = None + # use readily available resource monitor, otherwise create one (can happen in spawned subprocesses) + self.resource_monitor = Task.current_task()._resource_monitor or ResourceMonitor(Task.current_task()) + if not container_id and not force_register: + self.register_container() + self._stop_container_status_report_daemon = False + if report_statistics: + Thread(target=self.container_status_report_daemon, daemon=True).start() + + def stop(self): + self._stop_container_status_report_daemon = True + + def update( + self, + endpoint_name=None, + model_name=None, + model=None, + model_url=None, + model_source=None, + model_version=None, + tags=None, + system_tags=None, + input_size=None, + input_type=None, + endpoint_url=None, + preprocess_artifact=None, + ): + update_dict = {} + if endpoint_name is not None: + update_dict["endpoint_name"] = endpoint_name + if model_name is not None: + update_dict["model_name"] = model_name + if model_source is not None: + update_dict["model_source"] = model_source + if model_version is not None: + update_dict["model_version"] = model_version + if preprocess_artifact is not None: + update_dict["preprocess_artifact"] = preprocess_artifact + if input_type is not None: + update_dict["input_type"] = input_type + if input_size is not None: + update_dict["input_size"] = input_size + if tags is not None: + update_dict["tags"] = tags + if system_tags is not None: + update_dict["system_tags"] = system_tags + if endpoint_url is not None: + update_dict["endpoint_url"] = endpoint_url + self.container_info.update(update_dict) + references_to_add = {} + if model: + references_to_add["model"] = {"type": "model", "value": model} + if model_url: + references_to_add["model_url"] = {"type": "url", "value": model_url} + for reference in self.container_info["reference"]: + if reference["type"] in references_to_add: + reference["value"] = references_to_add[reference["type"]]["value"] + references_to_add.pop(reference["type"], None) + self.container_info["reference"].extend(list(references_to_add.values())) + + def register_container(self): + result = self.session.send_request("serving", "register_container", json=self.container_info) + if result.status_code != 200: + print("Failed registering container: {}".format(result.json())) + + def wait_for_endpoint_url(self): + while not self.container_info.get("endpoint_url"): + Task.current_task().reload() + endpoint = Task.current_task()._get_runtime_properties().get("endpoint") + if endpoint: + self.container_info["endpoint_url"] = endpoint + self.uptime_timestamp = time.time() + else: + time.sleep(1) + + def get_machine_stats(self): + def create_general_key(old_key): + return "{}_*".format(old_key) + + stats = self.resource_monitor._machine_stats() + elapsed = time.time() - self._previous_readouts_ts + self._previous_readouts_ts = time.time() + updates = {} + for k, v in stats.items(): + if k.endswith("_mbs"): + v = (v - self._previous_readouts.get(k, v)) / elapsed + updates[k] = v + self._previous_readouts = copy.deepcopy(stats) + stats.update(updates) + self._num_readouts += 1 + + preprocessed_stats = {} + ordered_keys = sorted(stats.keys()) + for k in ordered_keys: + v = stats[k] + if k in ["memory_used_gb", "memory_free_gb"]: + v *= 1024 + if isinstance(v, float): + v = round(v, 3) + stat_key = self.BACKEND_STAT_MAP.get(k) + if stat_key: + preprocessed_stats[stat_key] = v + else: + general_key = create_general_key(k) + if general_key.startswith("gpu"): + prev_general_key = general_key + general_key = "_".join(["gpu"] + general_key.split("_")[2:]) + if general_key == "gpu_mem_used_gb_*": + gpu_index = prev_general_key.split("_")[1] + mem_usage = min(stats["gpu_{}_mem_usage".format(gpu_index)], 99.99) + mem_free = stats["gpu_{}_mem_free_gb".format(gpu_index)] + v = (mem_usage * mem_free) / (100 - mem_usage) + if general_key in ["gpu_mem_used_gb_*", "gpu_mem_free_gb_*"]: + v *= 1024 + general_key = self.BACKEND_STAT_MAP.get(general_key) + if general_key: + preprocessed_stats.setdefault(general_key, []).append(v) + return preprocessed_stats + + def container_status_report_daemon(self): + while not self._stop_container_status_report_daemon: + self.container_status_report() + time.sleep(self.report_window) + + def container_status_report(self): + self.wait_for_endpoint_url() + status_report = {**self.container_info} + status_report["uptime_sec"] = int(time.time() - self.uptime_timestamp) + status_report["requests_num"] = self.requests_num + status_report["requests_min"] = self.requests_num_window + self.requests_num_prev_window + status_report["latency_ms"] = ( + 0 if (self.requests_num_window == 0) else (self.latency_sum_window / self.requests_num_window) + ) + status_report["machine_stats"] = self.get_machine_stats() + self.requests_num_prev_window = self.requests_num_window + self.requests_num_window = 0 + self.latency_sum_window = 0 + self.latency_num_window = 0 + result = self.session.send_request("serving", "container_status_report", json=status_report) + if result.status_code != 200: + print("Failed sending status report: {}".format(result.json())) + + def update_last_request_time(self): + self.last_request_time = time.time() + + def update_statistics(self): + self.requests_num += 1 + self.requests_num_window += 1 + latency = (time.time() - self.last_request_time) * 1000 + self.latency_sum_window += latency + + def on_request(self): + self.update_last_request_time() + + def on_response(self): + self.update_statistics() diff --git a/clearml/router/fastapi_proxy.py b/clearml/router/fastapi_proxy.py new file mode 100644 index 00000000..b3b4b7f3 --- /dev/null +++ b/clearml/router/fastapi_proxy.py @@ -0,0 +1,184 @@ +from fastapi import FastAPI, Request, Response +from typing import Optional +from multiprocessing import Process +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.routing import Match +import functools +import threading +import httpx +import uvicorn +from .route import Route +from ..utilities.process.mp import SafeQueue + + +class FastAPIProxy: + ALL_REST_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"] + + def __init__(self, port, workers=None, default_target=None): + self.app = None + self.routes = {} + self.port = port + self.message_queue = SafeQueue() + self.uvicorn_subprocess = None + self.workers = workers + self._default_target = default_target + self._default_session = None + self._in_subprocess = False + + def _create_default_route(self): + proxy = self + + class DefaultRouteMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + scope = { + "type": "http", + "method": request.method, + "path": request.url.path, + "root_path": "", + "headers": request.headers.raw, + "query_string": request.url.query.encode("utf-8"), + "client": request.client, + "server": request.scope.get("server"), + "scheme": request.url.scheme, + "extensions": request.scope.get("extensions", {}), + "app": request.scope.get("app"), + } + for route in proxy.app.router.routes: + if route.matches(scope)[0] == Match.FULL: + return await call_next(request) + proxied_response = await proxy._default_session.request( + method=request.method, + url=proxy._default_target + request.url.path, + headers=dict(request.headers), + content=await request.body(), + params=request.query_params, + ) + return Response( + content=proxied_response.content, + headers=dict(proxied_response.headers), + status_code=proxied_response.status_code, + ) + self.app.add_middleware(DefaultRouteMiddleware) + + async def proxy( + self, + request: Request, + path: Optional[str] = None, + source_path: Optional[str] = None, + ): + route_data = self.routes.get(source_path) + if not route_data: + return Response(status_code=404) + + request = route_data.on_request(request) + proxied_response = await route_data.session.request( + method=request.method, + url=f"{route_data.target_url}/{path}" if path else route_data.target_url, + headers=dict(request.headers), + content=await request.body(), + params=request.query_params, + ) + proxied_response = Response( + content=proxied_response.content, + headers=dict(proxied_response.headers), + status_code=proxied_response.status_code, + ) + return route_data.on_response(proxied_response, request) + + def add_route( + self, + source, + target, + request_callback=None, + response_callback=None, + endpoint_telemetry=True + ): + if not self._in_subprocess: + self.message_queue.put( + { + "method": "add_route", + "kwargs": { + "source": source, + "target": target, + "request_callback": request_callback, + "response_callback": response_callback, + "endpoint_telemetry": endpoint_telemetry + }, + } + ) + return + should_add_route = False + if source not in self.routes: + should_add_route = True + else: + self.routes[source].stop_endpoint_telemetry() + self.routes[source] = Route( + target, + request_callback=request_callback, + response_callback=response_callback, + session=httpx.AsyncClient(timeout=None), + ) + if endpoint_telemetry is True: + endpoint_telemetry = {} + if endpoint_telemetry is not False: + self.routes[source].set_endpoint_telemetry_args(**endpoint_telemetry) + if self._in_subprocess: + self.routes[source].start_endpoint_telemetry() + if should_add_route: + self.app.add_api_route( + source, + functools.partial( + self.proxy, + source_path=source, + ), + methods=self.ALL_REST_METHODS, + ) + self.app.add_api_route( + source.rstrip("/") + "/{path:path}", + functools.partial( + self.proxy, + source_path=source, + ), + methods=self.ALL_REST_METHODS, + ) + return self.routes[source] + + def remove_route(self, source): + if not self._in_subprocess: + self.message_queue.put({"method": "remove_route", "kwargs": {"source": source}}) + return + route = self.routes.get(source) + if route: + route.stop_endpoint_telemetry() + if source in self.routes: + # we are not popping the key to prevent calling self.app.add_api_route multiple times + # when self.add_route is called on the same source_path after removal + self.routes[source] = None + + def _start(self): + self._in_subprocess = True + self.app = FastAPI() + if self._default_target: + self._default_session = httpx.AsyncClient(timeout=None) + self._create_default_route() + for route in self.routes.values(): + route.start_endpoint_telemetry() + threading.Thread(target=self._rpc_manager, daemon=True).start() + uvicorn.run(self.app, port=self.port, host="0.0.0.0", workers=self.workers) + + def _rpc_manager(self): + while True: + message = self.message_queue.get() + if message["method"] == "add_route": + self.add_route(**message["kwargs"]) + elif message["method"] == "remove_route": + self.remove_route(**message["kwargs"]) + + def start(self): + self.uvicorn_subprocess = Process(target=self._start) + self.uvicorn_subprocess.start() + + def stop(self): + if self.uvicorn_subprocess: + self.uvicorn_subprocess.terminate() + self.uvicorn_subprocess = None diff --git a/clearml/router/proxy.py b/clearml/router/proxy.py new file mode 100644 index 00000000..50b44201 --- /dev/null +++ b/clearml/router/proxy.py @@ -0,0 +1,35 @@ +from .fastapi_proxy import FastAPIProxy + + +class HttpProxy: + DEFAULT_PORT = 9000 + + def __init__(self, port=None, workers=None, default_target=None): + # at the moment, only a fastapi proxy is supported + self.base_proxy = FastAPIProxy(port or self.DEFAULT_PORT, workers=workers, default_target=default_target) + self.base_proxy.start() + self.port = port + self.routes = {} + + def add_route(self, source, target, request_callback=None, response_callback=None, endpoint_telemetry=True): + self.routes[source] = self.base_proxy.add_route( + source=source, + target=target, + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry=endpoint_telemetry, + ) + return self.routes[source] + + def remove_route(self, source): + self.routes.pop(source, None) + self.base_proxy.remove_route(source) + + def get_routes(self): + return self.routes + + def start(self): + self.base_proxy.start() + + def stop(self): + self.base_proxy.stop() diff --git a/clearml/router/route.py b/clearml/router/route.py new file mode 100644 index 00000000..7c0d59b5 --- /dev/null +++ b/clearml/router/route.py @@ -0,0 +1,79 @@ +from .endpoint_telemetry import EndpointTelemetry + + +class Route: + def __init__(self, target_url, request_callback=None, response_callback=None, session=None): + self.target_url = target_url + self.request_callback = request_callback + self.response_callback = response_callback + self.session = session + self.persistent_state = {} + self._endpoint_telemetry = None + self._endpoint_telemetry_args = None + + def set_endpoint_telemetry_args( + self, + endpoint_name="endpoint", + model_name="model", + model=None, + model_url=None, + model_source=None, + model_version=None, + app_id=None, + app_instance=None, + tags=None, + system_tags=None, + container_id=None, + input_size=None, + input_type="str", + report_statistics=True, + endpoint_url=None, + preprocess_artifact=None, + force_register=False + ): + self._endpoint_telemetry_args = dict( + endpoint_name=endpoint_name, + model_name=model_name, + model=model, + model_url=model_url, + model_source=model_source, + model_version=model_version, + app_id=app_id, + app_instance=app_instance, + tags=tags, + system_tags=system_tags, + container_id=container_id, + input_size=input_size, + input_type=input_type, + report_statistics=report_statistics, + endpoint_url=endpoint_url, + preprocess_artifact=preprocess_artifact, + force_register=force_register + ) + + def start_endpoint_telemetry(self): + if self._endpoint_telemetry is not None or self._endpoint_telemetry_args is None: + return + self._endpoint_telemetry = EndpointTelemetry(**self._endpoint_telemetry_args) + + def stop_endpoint_telemetry(self): + if self._endpoint_telemetry is None: + return + self._endpoint_telemetry.stop() + self._endpoint_telemetry = None + + def on_request(self, request): + new_request = request + if self.request_callback: + new_request = self.request_callback(request, persistent_state=self.persistent_state) or request + if self._endpoint_telemetry: + self._endpoint_telemetry.on_request() + return new_request + + def on_response(self, response, request): + new_response = response + if self.response_callback: + new_response = self.response_callback(response, request, persistent_state=self.persistent_state) or response + if self._endpoint_telemetry: + self._endpoint_telemetry.on_response() + return new_response diff --git a/clearml/router/router.py b/clearml/router/router.py new file mode 100644 index 00000000..8e06fa5d --- /dev/null +++ b/clearml/router/router.py @@ -0,0 +1,203 @@ +from typing import Optional, Callable, Dict, Union # noqa +from fastapi import Request, Response # noqa +from .proxy import HttpProxy + + +class HttpRouter: + """ + A router class to manage HTTP routing for an application. + Allows the creation, deployment, and management of local and external endpoints, + as well as the configuration of a local proxy for traffic routing. + + Example usage: + + .. code-block:: py + def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify": + new_content = response.body.replace(b"modify", b"modified") + headers = copy.deepcopy(response.headers) + headers["Content-Length"] = str(len(new_content)) + return Response(status_code=response.status_code, headers=headers, content=new_content) + + router = Task.current_task().get_http_router() + router.set_local_proxy_parameters(incoming_port=9000) + router.create_local_route( + source="/", + target="http://localhost:8000", + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry={"model": "MyModel"} + ) + router.deploy(wait=True) + """ + _instance = None + + def __init__(self, task): + """ + Do not use directly. Use `Task.get_router` instead + """ + self._task = task + self._external_endpoint_port = None + self._proxy = None + self._proxy_params = {"port": HttpProxy.DEFAULT_PORT} + + def set_local_proxy_parameters(self, incoming_port=None, default_target=None): + # type: (Optional[int], Optional[str]) -> () + """ + Set the parameters with which the local proxy is initialized + + :param incoming_port: The incoming port of the proxy + :param default_target: If None, no default target is set. Otherwise, route all traffic + that doesn't match a local route created via `create_local_route` to this target + """ + self._proxy_params["port"] = incoming_port or HttpProxy.DEFAULT_PORT + self._proxy_params["default_target"] = default_target + + def start_local_proxy(self): + """ + Start the local proxy without deploying the router, i.e. requesting an external endpoint + """ + self._proxy = self._proxy or HttpProxy(**self._proxy_params) + + def create_local_route( + self, + source, # type: str + target, # type: str + request_callback=None, # type: Callable[Request, Dict] + response_callback=None, # type: Callable[Response, Request, Dict] + endpoint_telemetry=True # type: Union[bool, Dict] + ): + """ + Create a local route from a source to a target through a proxy. If no proxy instance + exists, one is automatically created. + This function enables routing traffic between the source and target endpoints, optionally + allowing custom callbacks to handle requests and responses or to gather telemetry data + at the endpoint. + To customize proxy parameters, use the `Router.set_local_proxy_parameters` method. + By default, the proxy binds to port 9000 for incoming requests. + + :param source: The source path for routing the traffic. For example, `/` will intercept + all the traffic sent to the proxy, while `/example` will only intercept the calls + that have `/example` as the path prefix. + :param target: The target URL where the intercepted traffic is routed. + :param request_callback: A function used to process each request before it is forwarded to the target. + The callback must have the following parameters: + - request - The intercepted FastAPI request + - persistent_state - A dictionary meant to be used as a caching utility object. + Shared with `response_callback` + The callback can return a FastAPI Request, in which case this request will be forwarded to the target + :param response_callback: A function used to process each response before it is returned by the proxy. + The callback must have the following parameters: + - response - The FastAPI response + - request - The FastAPI request (after being preprocessed by the proxy) + - persistent_state - A dictionary meant to be used as a caching utility object. + Shared with `request_callback` + The callback can return a FastAPI Response, in which case this response will be forwarded + :param endpoint_telemetry: If True, enable endpoint telemetry. If False, disable it. + If a dictionary is passed, enable endpoint telemetry with custom parameters. + The parameters are: + - endpoint_url - URL to the endpoint, mandatory if no external URL has been requested + - endpoint_name - name of the endpoint + - model_name - name of the model served by the endpoint + - model - referenced model + - model_url - URL to the model + - model_source - Source of the model + - model_version - Model version + - app_id - App ID, if used inside a ClearML app + - app_instance - The ID of the instance the ClearML app is running + - tags - ClearML tags + - system_tags - ClearML system tags + - container_id - Container ID, should be unique + - input_size - input size of the model + - input_type - input type expected by the model/endpoint + - report_statistics - whether or not to report statistics + """ + self.start_local_proxy() + self._proxy.add_route( + source, + target, + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry=endpoint_telemetry, + ) + + def remove_local_route(self, source): + # type: (str) -> () + """ + Remove a local route. If endpoint telemetry is enabled for that route, disable it + + :param source: Remove route based on the source path used to route the traffic + """ + if self._proxy: + self._proxy.remove_route(source) + + def deploy( + self, wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0 + ): + # type: (Optional[int], str, bool, float, float) -> Optional[Dict] + """ + Start the local HTTP proxy and request an external endpoint for an application + + :param port: Port the application is listening to. If no port is supplied, a local proxy + will be created. To control the proxy parameters, use `Router.set_local_proxy_parameters`. + To control create local routes through the proxy, use `Router.create_local_route`. + By default, the incoming port bound by the proxy is 9000 + :param protocol: As of now, only `http` is supported + :param wait: If True, wait for the endpoint to be assigned + :param wait_interval_seconds: The poll frequency when waiting for the endpoint + :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, + the method will no longer wait and None will be returned + + :return: If wait is False, this method will return None. + If no endpoint could be found while waiting, this mehtod returns None. + Otherwise, it returns a dictionary containing the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpo"int + """ + self._proxy = self._proxy or HttpProxy(**self._proxy_params) + return self._task.request_external_endpoint( + port=self._proxy.port, + protocol="http", + wait=wait, + wait_interval_seconds=wait_interval_seconds, + wait_timeout_seconds=wait_timeout_seconds, + ) + + def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0): + # type: (float) -> Optional[Dict] + """ + Wait for an external endpoint to be assigned + + :param wait_interval_seconds: The poll frequency when waiting for the endpoint + :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, + the method will no longer wait + + :return: If no endpoint could be found while waiting, this mehtod returns None. + Otherwise, it returns a dictionary containing the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpoint + """ + return self._task.wait_for_external_endpoint( + protocol="http", wait_interval_seconds=wait_interval_seconds, wait_timeout_seconds=wait_timeout_seconds + ) + + def list_external_endpoints(self): + # type: () -> List[Dict] + """ + List all external endpoints assigned + + :return: A list of dictionaries. Each dictionary contains the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpoint + """ + return self._task.list_external_endpoints(protocol="http") diff --git a/clearml/task.py b/clearml/task.py index c4a87e18..59b4bd70 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -113,6 +113,7 @@ import pandas import numpy from PIL import Image + from .router.router import HttpRouter # Forward declaration to help linters TaskInstance = TypeVar("TaskInstance", bound="Task") @@ -224,6 +225,7 @@ def __init__(self, private=None, **kwargs): self._calling_filename = None self._remote_functions_generated = {} self._external_endpoint_ports = {} + self._http_router = None # register atexit, so that we mark the task as stopped self._at_exit_called = False @@ -848,6 +850,49 @@ def should_connect(*keys): task._set_startup_info() return task + def get_http_router(self): + # type: () -> HttpRouter + """ + Retrieve an instance of `HttpRouter` to manage an external HTTP endpoint and intercept traffic. + The `HttpRouter` serves as a traffic manager, enabling the creation and configuration of local and external + routesto redirect, monitor, or manipulate HTTP requests and responses. It is designed to handle routing + needs such via a proxy setup which handles request/response interception and telemetry reporting for + applications that require HTTP endpoint management. + + Example usage: + + .. code-block:: py + def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify": + new_content = response.body.replace(b"modify", b"modified") + headers = copy.deepcopy(response.headers) + headers["Content-Length"] = str(len(new_content)) + return Response(status_code=response.status_code, headers=headers, content=new_content) + + router = Task.current_task().get_http_router() + router.set_local_proxy_parameters(incoming_port=9000) + router.create_local_route( + source="/", + target="http://localhost:8000", + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry={"model": "MyModel"} + ) + router.deploy(wait=True) + """ + try: + from .router.router import HttpRouter # noqa + except ImportError: + raise UsageError("Could not import `HttpRouter`. Please run `pip install clearml[router]`") + + if self._http_router is None: + self._http_router = HttpRouter(self) + return self._http_router + def request_external_endpoint( self, port, protocol="http", wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0 ): @@ -996,11 +1041,14 @@ def _wait_for_external_endpoint( return None time.sleep(wait_interval_seconds) - def list_external_endpoints(self): - # type: () -> List[Dict] + def list_external_endpoints(self, protocol=None): + # type: (Optional[str]) -> List[Dict] """ List all external endpoints assigned + :param protocol: If None, list all external endpoints. Otherwise, only list endpoints + that use this protocol + :return: A list of dictionaries. Each dictionary contains the following values: - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser @@ -1010,7 +1058,8 @@ def list_external_endpoints(self): Session.verify_feature_set("advanced") runtime_props = self._get_runtime_properties() results = [] - for protocol in ["http", "tcp"]: + protocols = [protocol] if protocol is not None else ["http", "tcp"] + for protocol in protocols: internal_port = runtime_props.get(self._external_endpoint_internal_port_map[protocol]) if internal_port: self._external_endpoint_ports[protocol] = internal_port diff --git a/examples/router/http_router.py b/examples/router/http_router.py new file mode 100644 index 00000000..e70d1a52 --- /dev/null +++ b/examples/router/http_router.py @@ -0,0 +1,42 @@ +""" +Example on how to use the ClearML HTTP router. +For this example, you would first need a webserver to route the traffic to: +`simple_webserver.py` launches such a server. Running the script will start a +webserver, bound to localhost:8000. + +Then, when running this example, it creates a router which binds to 0.0.0.0:9000. +A local route is then created, which will proxy all traffic from +`http://:9000/example_source` to `http://localhost:8000/serve`. + +Trafic can be intercepted both on request and response via callbacks. See +`request_callback` and `response_callback`. + +By default, the route traffic is monitored and telemetry is sent to the ClearML +server. To disable this, pass `endpoint_telemetry=False` when creating the route +""" + +import time +from clearml import Task + + +def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + +def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + + +if __name__ == "__main__": + task = Task.init(project_name="Router Example", task_name="Router Example") + router = task.get_http_router() + router.set_local_proxy_parameters(incoming_port=9000, default_target="http://localhost:8000") + router.create_local_route( + source="/example_source", + target="http://localhost:8000/serve", # route traffic to this address + request_callback=request_callback, # intercept requests + response_callback=response_callback, # intercept responses + endpoint_telemetry={"model": "MyModel"} # set this to False to disable telemetry + ) + router.deploy(wait=True) + # run `curl http://localhost:9000/example_source/1` diff --git a/examples/router/requirements.txt b/examples/router/requirements.txt new file mode 100644 index 00000000..7792a35a --- /dev/null +++ b/examples/router/requirements.txt @@ -0,0 +1,2 @@ +clearml +clearml[router] diff --git a/examples/router/simple_webserver.py b/examples/router/simple_webserver.py new file mode 100644 index 00000000..851b87a4 --- /dev/null +++ b/examples/router/simple_webserver.py @@ -0,0 +1,44 @@ +""" +A simple webserver, used as a tool to showcase the capabilities of +ClearML HTTP router. See `http_router.py` for more details. +""" + + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import uvicorn + +app = FastAPI() + +actions = { + "1": {"name": "Action 1", "description": "This is model action 1"}, + "2": {"name": "Action 2", "description": "This is model action 2"}, + "3": {"name": "Action 3", "description": "This is model action 3"}, +} + + +class Item(BaseModel): + name: str + description: str + + +@app.get("/") +def read_root(): + return {"message": "Welcome to the FastAPI application!"} + + +@app.get("/serve/{action}", response_model=Item) +def read_item(action: str): + if action in actions: + return actions[action] + else: + raise HTTPException(status_code=404, detail="Item not found") + + +if __name__ == "__main__": + uvicorn.run( + "simple_webserver:app", + host="127.0.0.1", + port=8000, + reload=True + ) diff --git a/setup.py b/setup.py index 79a3ce7e..10ae2e5e 100644 --- a/setup.py +++ b/setup.py @@ -87,6 +87,11 @@ def read_version_string(version_file): 'gs': [ 'google-cloud-storage>=1.13.2', ], + 'router': [ + 'fastapi>=0.115.2', + 'uvicorn>=0.31.1', + 'httpx>=0.27.2' + ] }, package_data={ 'clearml': ['config/default/*.conf', 'backend_api/config/default/*.conf']