diff --git a/python/pyfunc-server/examples/echo-http/__init__.py b/python/pyfunc-server/examples/echo_http/__init__.py similarity index 100% rename from python/pyfunc-server/examples/echo-http/__init__.py rename to python/pyfunc-server/examples/echo_http/__init__.py diff --git a/python/pyfunc-server/examples/echo-http/echo_http.py b/python/pyfunc-server/examples/echo_http/echo_http.py similarity index 91% rename from python/pyfunc-server/examples/echo-http/echo_http.py rename to python/pyfunc-server/examples/echo_http/echo_http.py index 10133d277..e0b42f20c 100644 --- a/python/pyfunc-server/examples/echo-http/echo_http.py +++ b/python/pyfunc-server/examples/echo_http/echo_http.py @@ -18,7 +18,6 @@ def infer(self, request): merlin.run_pyfunc_model( model_instance=EchoModel(), conda_env="env.yaml", - pyfunc_base_image="ghcr.io/caraml-dev/merlin/merlin-pyfunc-base:0.38.1", ) # Or, if you already have logged existing model version on Merlin, diff --git a/python/pyfunc-server/examples/echo-http/env.yaml b/python/pyfunc-server/examples/echo_http/env.yaml similarity index 100% rename from python/pyfunc-server/examples/echo-http/env.yaml rename to python/pyfunc-server/examples/echo_http/env.yaml diff --git a/python/pyfunc-server/examples/echo_upi/README.md b/python/pyfunc-server/examples/echo_upi/README.md new file mode 100644 index 000000000..e0cf3ac34 --- /dev/null +++ b/python/pyfunc-server/examples/echo_upi/README.md @@ -0,0 +1,13 @@ +# Echo UPI Model Examples + +Run the server locally: + +``` +python upi_server.py +``` + +In different terminal session, run the client: + +``` +python upi_client.py +``` diff --git a/python/pyfunc-server/examples/echo_upi/__init__.py b/python/pyfunc-server/examples/echo_upi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/pyfunc-server/examples/echo-upi/env.yaml b/python/pyfunc-server/examples/echo_upi/env.yaml similarity index 56% rename from python/pyfunc-server/examples/echo-upi/env.yaml rename to python/pyfunc-server/examples/echo_upi/env.yaml index 28c16b4c5..f035843cc 100644 --- a/python/pyfunc-server/examples/echo-upi/env.yaml +++ b/python/pyfunc-server/examples/echo_upi/env.yaml @@ -1,4 +1,2 @@ -channels: - - defaults dependencies: - python=3.10 diff --git a/python/pyfunc-server/examples/echo_upi/upi_client.py b/python/pyfunc-server/examples/echo_upi/upi_client.py new file mode 100644 index 000000000..1d55effa3 --- /dev/null +++ b/python/pyfunc-server/examples/echo_upi/upi_client.py @@ -0,0 +1,29 @@ +import grpc +import pandas as pd +from caraml.upi.utils import df_to_table +from caraml.upi.v1 import upi_pb2, upi_pb2_grpc + + +def create_upi_request() -> upi_pb2.PredictValuesRequest: + target_name = "echo" + df = pd.DataFrame( + [[4, 1, "hi"]] * 3, + columns=["int_value", "int_value_2", "string_value"], + index=["0000", "1111", "2222"], + ) + prediction_id = "12345" + + return upi_pb2.PredictValuesRequest( + target_name=target_name, + prediction_table=df_to_table(df, "predict"), + metadata=upi_pb2.RequestMetadata(prediction_id=prediction_id), + ) + + +if __name__ == "__main__": + channel = grpc.insecure_channel(f"localhost:8080") + stub = upi_pb2_grpc.UniversalPredictionServiceStub(channel) + + request = create_upi_request() + response = stub.PredictValues(request=request) + print(response) diff --git a/python/pyfunc-server/examples/echo-upi/__init__.py b/python/pyfunc-server/examples/echo_upi/upi_server.py similarity index 87% rename from python/pyfunc-server/examples/echo-upi/__init__.py rename to python/pyfunc-server/examples/echo_upi/upi_server.py index 538bbbdee..3dd1fccf0 100644 --- a/python/pyfunc-server/examples/echo-upi/__init__.py +++ b/python/pyfunc-server/examples/echo_upi/upi_server.py @@ -2,7 +2,7 @@ import os import grpc -import mlflow +import merlin from caraml.upi.v1 import upi_pb2 from merlin.model import PyFuncModel from prometheus_client import Counter, Gauge @@ -42,6 +42,11 @@ def upiv1_infer( if __name__ == "__main__": model_name = "echo-model" model_version = "1" - mlflow.pyfunc.log_model( - "model", python_model=EchoUPIModel(model_name, model_version) + + merlin.run_pyfunc_model( + model_instance=EchoUPIModel(model_name, model_version), + conda_env="env.yaml", + env_vars={ + "CARAML_PROTOCOL": "UPI_V1", + }, ) diff --git a/python/sdk/merlin/model.py b/python/sdk/merlin/model.py index e0724b629..f535b42ca 100644 --- a/python/sdk/merlin/model.py +++ b/python/sdk/merlin/model.py @@ -1469,6 +1469,8 @@ def start_server( model_name=self.model.name, model_version=f"{self.id}", pyfunc_base_image=pyfunc_base_image, + port=port, + env_vars=env_vars, debug=debug, ) return diff --git a/python/sdk/merlin/pyfunc.py b/python/sdk/merlin/pyfunc.py index 06a3318a4..b852edf6d 100644 --- a/python/sdk/merlin/pyfunc.py +++ b/python/sdk/merlin/pyfunc.py @@ -402,6 +402,7 @@ def run_pyfunc_model( artifacts: Dict[str, str] = None, pyfunc_base_image: str = None, port: int = 8080, + env_vars: Dict[str, str] = None, debug: bool = False, ): """ @@ -413,6 +414,7 @@ def run_pyfunc_model( :param artifacts: dictionary of artifact that will be stored together with the model. This will be passed to PythonModel.initialize. Example: {"config": "config/staging.yaml"} :param pyfunc_base_image: base image for building pyfunc model :param port: port to expose the model + :param env_vars: dictionary of environment variables to be passed to the server :param debug: flag to enable debug mode that will print docker build log """ @@ -449,6 +451,7 @@ def run_pyfunc_model( model_version="dev", pyfunc_base_image=pyfunc_base_image, port=port, + env_vars=env_vars, debug=debug, ) @@ -461,6 +464,7 @@ def run_pyfunc_local_server( model_version: str, pyfunc_base_image: str = None, port: int = 8080, + env_vars: Dict[str, str] = None, debug: bool = False, ): if pyfunc_base_image is None: @@ -501,6 +505,7 @@ def run_pyfunc_local_server( model_version=model_version, model_full_name=f"{model_name}-{model_version}", port=port, + env_vars=env_vars, ) @@ -540,7 +545,14 @@ def _build_image( wait_build_complete(logs, debug) -def _run_container(image_tag, model_name, model_version, model_full_name, port): +def _run_container( + image_tag, + model_name, + model_version, + model_full_name, + port, + env_vars: Dict[str, str] = None, +): docker_client = docker.from_env() # Stop all previous containers to avoid port conflict @@ -551,18 +563,22 @@ def _run_container(image_tag, model_name, model_version, model_full_name, port): started_container.remove(force=True) try: - env_vars = {} env_vars["CARAML_HTTP_PORT"] = "8080" + env_vars["CARAML_GRPC_PORT"] = "9000" env_vars["CARAML_MODEL_NAME"] = model_name env_vars["CARAML_MODEL_VERSION"] = model_version env_vars["CARAML_MODEL_FULL_NAME"] = model_full_name env_vars["WORKERS"] = "1" + ports = {"8080/tcp": port} + if "CARAML_PROTOCOL" in env_vars and env_vars["CARAML_PROTOCOL"] == "UPI_V1": + ports = {"9000/tcp": port} + container = docker_client.containers.run( image=image_tag, name=model_name, labels={"managed-by": "merlin"}, - ports={"8080/tcp": port}, + ports=ports, environment=env_vars, detach=True, remove=True,