diff --git a/api/config/observability.go b/api/config/observability.go index 021d42020..26b485e72 100644 --- a/api/config/observability.go +++ b/api/config/observability.go @@ -1,5 +1,7 @@ package config +import "time" + // ObservabilityPublisher type ObservabilityPublisher struct { ArizeSink ArizeSink @@ -11,6 +13,7 @@ type ObservabilityPublisher struct { Replicas int32 TargetNamespace string ServiceAccountName string + DeploymentTimeout time.Duration `default:"30m"` } // KafkaConsumer diff --git a/api/models/model.go b/api/models/model.go index ac636db1d..fb0893f6a 100644 --- a/api/models/model.go +++ b/api/models/model.go @@ -60,7 +60,7 @@ type Model struct { ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"` Type string `json:"type" gorm:"type"` MlflowURL string `json:"mlflow_url" gorm:"-"` - ObservabilitySupported bool `json:"-" gorm:"column:observability_supported"` + ObservabilitySupported bool `json:"observability_supported" gorm:"column:observability_supported"` Endpoints []*ModelEndpoint `json:"endpoints" gorm:"foreignkey:ModelID;"` diff --git a/api/models/service.go b/api/models/service.go index fa7f83621..11d5106d5 100644 --- a/api/models/service.go +++ b/api/models/service.go @@ -127,24 +127,26 @@ func getPredictionLogTopicForVersion(project string, modelName string, modelVers func MergeProjectVersionLabels(projectLabels mlp.Labels, versionLabels KV) mlp.Labels { projectLabelsMap := map[string]int{} + updatedLabels := make(mlp.Labels, 0) for index, projectLabel := range projectLabels { projectLabelsMap[projectLabel.Key] = index + updatedLabels = append(updatedLabels, projectLabel) } for versionLabelKey, versionLabelValue := range versionLabels { if _, labelExists := projectLabelsMap[versionLabelKey]; labelExists { index := projectLabelsMap[versionLabelKey] - projectLabels[index].Value = fmt.Sprint(versionLabelValue) + updatedLabels[index].Value = fmt.Sprint(versionLabelValue) continue } - projectLabels = append(projectLabels, mlpclient.Label{ + updatedLabels = append(updatedLabels, mlpclient.Label{ Key: versionLabelKey, Value: fmt.Sprint(versionLabelValue), }) } - return projectLabels + return updatedLabels } func CreateInferenceServiceName(modelName, versionID, revisionID string) string { diff --git a/api/pkg/observability/deployment/deployment.go b/api/pkg/observability/deployment/deployment.go index e265afe89..4c58287c4 100644 --- a/api/pkg/observability/deployment/deployment.go +++ b/api/pkg/observability/deployment/deployment.go @@ -19,6 +19,10 @@ import ( "k8s.io/client-go/rest" ) +const ( + appLabelKey = "app" +) + type Manifest struct { Deployment *appsv1.Deployment Secret *corev1.Secret @@ -288,7 +292,7 @@ func (c *deployer) createSecretSpec(data *models.WorkerData) (*corev1.Secret, er ObjectMeta: metav1.ObjectMeta{ Name: c.getSecretName(data), Namespace: c.targetNamespace(), - Labels: data.Metadata.ToLabel(), + Labels: c.getLabels(data), }, StringData: map[string]string{ "config.yaml": string(consumerCfgStr), @@ -322,8 +326,15 @@ func (c *deployer) applyDeployment(ctx context.Context, data *models.WorkerData, return applyDeploymentFunc(data, secretName, true) } -func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.WorkerData, secretName string) (*appsv1.Deployment, error) { +func (c *deployer) getLabels(data *models.WorkerData) map[string]string { labels := data.Metadata.ToLabel() + labels[appLabelKey] = data.Metadata.App + return labels +} + +func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.WorkerData, secretName string) (*appsv1.Deployment, error) { + labels := c.getLabels(data) + cfgVolName := "config-volume" return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -337,11 +348,18 @@ func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.Worker Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": data.Metadata.App, + appLabelKey: data.Metadata.App, }, }, Replicas: &c.consumerConfig.Replicas, Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: map[string]string{ + PublisherRevisionAnnotationKey: strconv.Itoa(data.Revision), + }, + }, + Spec: corev1.PodSpec{ Containers: []corev1.Container{ { @@ -386,9 +404,11 @@ func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.Worker } func (c *deployer) waitUntilDeploymentReady(ctx context.Context, deployment *appsv1.Deployment, revision int) error { - deploymentv1 := c.kubeClient.AppsV1().Deployments(deployment.Name) - timeout := time.After(1 * time.Minute) - watcher, err := deploymentv1.Watch(ctx, metav1.ListOptions{}) + deploymentv1 := c.kubeClient.AppsV1().Deployments(deployment.Namespace) + timeout := time.After(c.consumerConfig.DeploymentTimeout) + watcher, err := deploymentv1.Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", deployment.Name), + }) if err != nil { return err } diff --git a/api/pkg/observability/deployment/deployment_test.go b/api/pkg/observability/deployment/deployment_test.go index 39e525af2..68f78eeb7 100644 --- a/api/pkg/observability/deployment/deployment_test.go +++ b/api/pkg/observability/deployment/deployment_test.go @@ -3,6 +3,8 @@ package deployment import ( "context" "fmt" + "time" + "net/http" "reflect" "strconv" @@ -52,6 +54,7 @@ const ( func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.ResourceList, resourceLimit corev1.ResourceList, imageName string) *appsv1.Deployment { labels := data.Metadata.ToLabel() + labels[appLabelKey] = data.Metadata.App numReplicas := int32(2) cfgVolName := "config-volume" depl := &appsv1.Deployment{ @@ -70,6 +73,12 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour }, }, Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: map[string]string{ + PublisherRevisionAnnotationKey: strconv.Itoa(data.Revision), + }, + }, Spec: corev1.PodSpec{ ServiceAccountName: serviceAccountName, Containers: []corev1.Container{ @@ -201,6 +210,7 @@ func Test_deployer_Deploy(t *testing.T) { Replicas: 2, TargetNamespace: namespace, ServiceAccountName: serviceAccountName, + DeploymentTimeout: 5 * time.Second, } requestResource := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), diff --git a/api/queue/work/model_service_deployment_test.go b/api/queue/work/model_service_deployment_test.go index d2883a905..f91ed8dae 100644 --- a/api/queue/work/model_service_deployment_test.go +++ b/api/queue/work/model_service_deployment_test.go @@ -169,6 +169,10 @@ func TestExecuteDeployment(t *testing.T) { ResourceRequest: env.DefaultResourceRequest, VersionID: version.ID, Namespace: project.Name, + RevisionID: models.ID(1), + Status: models.EndpointRunning, + URL: fmt.Sprintf("%s-%d-1.example.com", model.Name, version.ID), + ServiceName: fmt.Sprintf("%s-%d-1.project.svc.cluster.local", model.Name, version.ID), EnableModelObservability: true, }, &models.Model{Name: "model", Project: project, ObservabilitySupported: true}).Return(nil) return producer @@ -225,6 +229,10 @@ func TestExecuteDeployment(t *testing.T) { ResourceRequest: env.DefaultResourceRequest, VersionID: version.ID, Namespace: project.Name, + RevisionID: models.ID(1), + Status: models.EndpointRunning, + URL: fmt.Sprintf("%s-%d-1.example.com", model.Name, version.ID), + ServiceName: fmt.Sprintf("%s-%d-1.project.svc.cluster.local", model.Name, version.ID), EnableModelObservability: true, }, &models.Model{Name: "model", Project: project, ObservabilitySupported: true}).Return(fmt.Errorf("producer error")) return producer @@ -669,11 +677,12 @@ func TestExecuteDeployment(t *testing.T) { }, } svc := &ModelServiceDeployment{ - ClusterControllers: controllers, - ImageBuilder: imgBuilder, - Storage: mockStorage, - DeploymentStorage: mockDeploymentStorage, - LoggerDestinationURL: loggerDestinationURL, + ClusterControllers: controllers, + ImageBuilder: imgBuilder, + Storage: mockStorage, + DeploymentStorage: mockDeploymentStorage, + LoggerDestinationURL: loggerDestinationURL, + ObservabilityEventProducer: tt.eventProducer, } err := svc.Deploy(job) diff --git a/api/queue/work/observability_publisher_deployment.go b/api/queue/work/observability_publisher_deployment.go index 61f0a5d7a..03387019c 100644 --- a/api/queue/work/observability_publisher_deployment.go +++ b/api/queue/work/observability_publisher_deployment.go @@ -77,7 +77,7 @@ func (op *ObservabilityPublisherDeployment) deploymentIsOngoing(ctx context.Cont if err != nil { return queue.RetryableError{Message: err.Error()} } - if deployedManifest.Deployment == nil { + if deployedManifest == nil { return nil } diff --git a/api/queue/work/observability_publisher_deployment_test.go b/api/queue/work/observability_publisher_deployment_test.go index bfb28a4bc..7435c9b18 100644 --- a/api/queue/work/observability_publisher_deployment_test.go +++ b/api/queue/work/observability_publisher_deployment_test.go @@ -57,7 +57,7 @@ func TestDeploy(t *testing.T) { ModelVersion: "1", Revision: 1, TopicSource: "caraml-project-1-model-1-1-prediction-log", - }).Return(&deployment.Manifest{}, nil) + }).Return(nil, nil) mockDeployer.On("Deploy", mock.Anything, &models.WorkerData{ Project: "project-1", ModelSchemaSpec: schemaSpec, @@ -131,7 +131,7 @@ func TestDeploy(t *testing.T) { ModelVersion: "1", Revision: 1, TopicSource: "caraml-project-1-model-1-1-prediction-log", - }).Return(&deployment.Manifest{}, nil) + }).Return(nil, nil) mockDeployer.On("Deploy", mock.Anything, &models.WorkerData{ Project: "project-1", ModelSchemaSpec: schemaSpec, @@ -240,7 +240,7 @@ func TestDeploy(t *testing.T) { ModelVersion: "1", Revision: 1, TopicSource: "caraml-project-1-model-1-1-prediction-log", - }).Return(&deployment.Manifest{}, nil) + }).Return(nil, nil) mockDeployer.On("Undeploy", mock.Anything, &models.WorkerData{ Project: "project-1", ModelSchemaSpec: schemaSpec, @@ -314,7 +314,7 @@ func TestDeploy(t *testing.T) { ModelVersion: "1", Revision: 1, TopicSource: "caraml-project-1-model-1-1-prediction-log", - }).Return(&deployment.Manifest{}, nil) + }).Return(nil, nil) mockDeployer.On("Undeploy", mock.Anything, &models.WorkerData{ Project: "project-1", ModelSchemaSpec: schemaSpec, diff --git a/db-migrations/39_observability_publisher.down.sql b/db-migrations/39_observability_publisher.down.sql index 778c78796..0acc2bd78 100644 --- a/db-migrations/39_observability_publisher.down.sql +++ b/db-migrations/39_observability_publisher.down.sql @@ -1 +1,2 @@ -DROP TABLE observability_publishers; \ No newline at end of file +DROP TABLE observability_publishers; +DROP TYPE publisher_status; \ No newline at end of file diff --git a/db-migrations/39_observability_publisher.up.sql b/db-migrations/39_observability_publisher.up.sql index b5d9df501..6b2a4fce6 100644 --- a/db-migrations/39_observability_publisher.up.sql +++ b/db-migrations/39_observability_publisher.up.sql @@ -1,4 +1,4 @@ -CREATE TYPE publisher_status as ENUM ('pending', 'running', 'failed', 'terminating'); +CREATE TYPE publisher_status as ENUM ('pending', 'running', 'failed', 'terminated'); CREATE TABLE IF NOT EXISTS observability_publishers ( diff --git a/python/sdk/test/pyfunc_integration_test.py b/python/sdk/test/pyfunc_integration_test.py index 69d2d0adf..fe14a8d45 100644 --- a/python/sdk/test/pyfunc_integration_test.py +++ b/python/sdk/test/pyfunc_integration_test.py @@ -14,6 +14,7 @@ import os import warnings +from functools import reduce from test.utils import undeploy_all_version import joblib @@ -21,6 +22,8 @@ import pytest import xgboost as xgb from merlin.model import ModelType, PyFuncModel, PyFuncV3Model +from merlin.model_schema import ModelSchema +from merlin.observability.inference import InferenceSchema, RegressionOutput, ValueType from merlin.pyfunc import ModelInput, ModelOutput, Values from merlin.resource_request import ResourceRequest from sklearn import svm @@ -63,12 +66,12 @@ def infer(self, model_input): class ModelObservabilityModel(PyFuncV3Model): def initialize(self, artifacts): self._feature_names = [ - "sepal length (cm)", - "sepal width (cm)", - "petal length (cm)", - "petal width (cm)", + "sepal_length", + "sepal_width", + "petal_length", + "petal_width", ] - self._target_names = ["setosa", "versicolor", "virginica"] + self._target_names = ["prediction_score"] self._model = xgb.Booster(model_file=artifacts["xgb_model"]) def preprocess(self, request: dict, **kwargs) -> ModelInput: @@ -81,9 +84,17 @@ def preprocess(self, request: dict, **kwargs) -> ModelInput: def infer(self, model_input: ModelInput) -> ModelOutput: dmatrix = xgb.DMatrix(model_input.features.data) outputs = self._model.predict(dmatrix).tolist() + + def max(first, second): + if first > second: + return first + return second + + prediction_outputs = [[reduce(max, row)] for row in outputs] + return ModelOutput( prediction_ids=model_input.prediction_ids, - predictions=Values(columns=self._target_names, data=outputs), + predictions=Values(columns=self._target_names, data=prediction_outputs), ) def postprocess(self, model_output: ModelOutput, request: dict) -> dict: @@ -234,13 +245,26 @@ def test_pyfunc_model_observability( merlin.set_project(project_name) merlin.set_model("pyfunc-mlobs", ModelType.PYFUNC_V3) + model_schema = ModelSchema( + spec=InferenceSchema( + feature_types={ + "sepal_length": ValueType.FLOAT64, + "sepal_width": ValueType.FLOAT64, + "petal_length": ValueType.FLOAT64, + "petal_width": ValueType.FLOAT64, + }, + model_prediction_output=RegressionOutput( + prediction_score_column="prediction_score", actual_score_column="actual" + ), + ) + ) undeploy_all_version() - with merlin.new_model_version() as v: + with merlin.new_model_version(model_schema=model_schema) as v: iris = load_iris() y = iris["target"] X = iris["data"] - xgb_path = train_xgboost_model(X, y) + xgb_path = train_xgboost_model(X, y) v.log_pyfunc_model( model_instance=ModelObservabilityModel(), conda_env="test/pyfunc/env.yaml",