Skip to content

Commit

Permalink
Fix watcher for deployment manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
tiopramayudi committed Mar 15, 2024
1 parent d5b0bf2 commit 5e84d6d
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 30 deletions.
3 changes: 3 additions & 0 deletions api/config/observability.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

import "time"

// ObservabilityPublisher
type ObservabilityPublisher struct {
ArizeSink ArizeSink
Expand All @@ -11,6 +13,7 @@ type ObservabilityPublisher struct {
Replicas int32
TargetNamespace string
ServiceAccountName string
DeploymentTimeout time.Duration `default:"30m"`
}

// KafkaConsumer
Expand Down
2 changes: 1 addition & 1 deletion api/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Model struct {
ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"`
Type string `json:"type" gorm:"type"`
MlflowURL string `json:"mlflow_url" gorm:"-"`
ObservabilitySupported bool `json:"-" gorm:"column:observability_supported"`
ObservabilitySupported bool `json:"observability_supported" gorm:"column:observability_supported"`

Endpoints []*ModelEndpoint `json:"endpoints" gorm:"foreignkey:ModelID;"`

Expand Down
8 changes: 5 additions & 3 deletions api/models/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,26 @@ func getPredictionLogTopicForVersion(project string, modelName string, modelVers

func MergeProjectVersionLabels(projectLabels mlp.Labels, versionLabels KV) mlp.Labels {
projectLabelsMap := map[string]int{}
updatedLabels := make(mlp.Labels, 0)
for index, projectLabel := range projectLabels {
projectLabelsMap[projectLabel.Key] = index
updatedLabels = append(updatedLabels, projectLabel)
}

for versionLabelKey, versionLabelValue := range versionLabels {
if _, labelExists := projectLabelsMap[versionLabelKey]; labelExists {
index := projectLabelsMap[versionLabelKey]
projectLabels[index].Value = fmt.Sprint(versionLabelValue)
updatedLabels[index].Value = fmt.Sprint(versionLabelValue)
continue
}

projectLabels = append(projectLabels, mlpclient.Label{
updatedLabels = append(updatedLabels, mlpclient.Label{
Key: versionLabelKey,
Value: fmt.Sprint(versionLabelValue),
})
}

return projectLabels
return updatedLabels
}

func CreateInferenceServiceName(modelName, versionID, revisionID string) string {
Expand Down
32 changes: 26 additions & 6 deletions api/pkg/observability/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"k8s.io/client-go/rest"
)

const (
appLabelKey = "app"
)

type Manifest struct {
Deployment *appsv1.Deployment
Secret *corev1.Secret
Expand Down Expand Up @@ -288,7 +292,7 @@ func (c *deployer) createSecretSpec(data *models.WorkerData) (*corev1.Secret, er
ObjectMeta: metav1.ObjectMeta{
Name: c.getSecretName(data),
Namespace: c.targetNamespace(),
Labels: data.Metadata.ToLabel(),
Labels: c.getLabels(data),
},
StringData: map[string]string{
"config.yaml": string(consumerCfgStr),
Expand Down Expand Up @@ -322,8 +326,15 @@ func (c *deployer) applyDeployment(ctx context.Context, data *models.WorkerData,
return applyDeploymentFunc(data, secretName, true)
}

func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.WorkerData, secretName string) (*appsv1.Deployment, error) {
func (c *deployer) getLabels(data *models.WorkerData) map[string]string {
labels := data.Metadata.ToLabel()
labels[appLabelKey] = data.Metadata.App
return labels
}

func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.WorkerData, secretName string) (*appsv1.Deployment, error) {
labels := c.getLabels(data)

cfgVolName := "config-volume"
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -337,11 +348,18 @@ func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.Worker
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": data.Metadata.App,
appLabelKey: data.Metadata.App,
},
},
Replicas: &c.consumerConfig.Replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{
PublisherRevisionAnnotationKey: strconv.Itoa(data.Revision),
},
},

Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -386,9 +404,11 @@ func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.Worker
}

func (c *deployer) waitUntilDeploymentReady(ctx context.Context, deployment *appsv1.Deployment, revision int) error {
deploymentv1 := c.kubeClient.AppsV1().Deployments(deployment.Name)
timeout := time.After(1 * time.Minute)
watcher, err := deploymentv1.Watch(ctx, metav1.ListOptions{})
deploymentv1 := c.kubeClient.AppsV1().Deployments(deployment.Namespace)
timeout := time.After(c.consumerConfig.DeploymentTimeout)
watcher, err := deploymentv1.Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", deployment.Name),
})
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions api/pkg/observability/deployment/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package deployment
import (
"context"
"fmt"
"time"

"net/http"
"reflect"
"strconv"
Expand Down Expand Up @@ -52,6 +54,7 @@ const (

func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.ResourceList, resourceLimit corev1.ResourceList, imageName string) *appsv1.Deployment {
labels := data.Metadata.ToLabel()
labels[appLabelKey] = data.Metadata.App
numReplicas := int32(2)
cfgVolName := "config-volume"
depl := &appsv1.Deployment{
Expand All @@ -70,6 +73,12 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{
PublisherRevisionAnnotationKey: strconv.Itoa(data.Revision),
},
},
Spec: corev1.PodSpec{
ServiceAccountName: serviceAccountName,
Containers: []corev1.Container{
Expand Down Expand Up @@ -201,6 +210,7 @@ func Test_deployer_Deploy(t *testing.T) {
Replicas: 2,
TargetNamespace: namespace,
ServiceAccountName: serviceAccountName,
DeploymentTimeout: 5 * time.Second,
}
requestResource := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
Expand Down
19 changes: 14 additions & 5 deletions api/queue/work/model_service_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func TestExecuteDeployment(t *testing.T) {
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
RevisionID: models.ID(1),
Status: models.EndpointRunning,
URL: fmt.Sprintf("%s-%d-1.example.com", model.Name, version.ID),
ServiceName: fmt.Sprintf("%s-%d-1.project.svc.cluster.local", model.Name, version.ID),
EnableModelObservability: true,
}, &models.Model{Name: "model", Project: project, ObservabilitySupported: true}).Return(nil)
return producer
Expand Down Expand Up @@ -225,6 +229,10 @@ func TestExecuteDeployment(t *testing.T) {
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
RevisionID: models.ID(1),
Status: models.EndpointRunning,
URL: fmt.Sprintf("%s-%d-1.example.com", model.Name, version.ID),
ServiceName: fmt.Sprintf("%s-%d-1.project.svc.cluster.local", model.Name, version.ID),
EnableModelObservability: true,
}, &models.Model{Name: "model", Project: project, ObservabilitySupported: true}).Return(fmt.Errorf("producer error"))
return producer
Expand Down Expand Up @@ -669,11 +677,12 @@ func TestExecuteDeployment(t *testing.T) {
},
}
svc := &ModelServiceDeployment{
ClusterControllers: controllers,
ImageBuilder: imgBuilder,
Storage: mockStorage,
DeploymentStorage: mockDeploymentStorage,
LoggerDestinationURL: loggerDestinationURL,
ClusterControllers: controllers,
ImageBuilder: imgBuilder,
Storage: mockStorage,
DeploymentStorage: mockDeploymentStorage,
LoggerDestinationURL: loggerDestinationURL,
ObservabilityEventProducer: tt.eventProducer,
}

err := svc.Deploy(job)
Expand Down
2 changes: 1 addition & 1 deletion api/queue/work/observability_publisher_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (op *ObservabilityPublisherDeployment) deploymentIsOngoing(ctx context.Cont
if err != nil {
return queue.RetryableError{Message: err.Error()}
}
if deployedManifest.Deployment == nil {
if deployedManifest == nil {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions api/queue/work/observability_publisher_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDeploy(t *testing.T) {
ModelVersion: "1",
Revision: 1,
TopicSource: "caraml-project-1-model-1-1-prediction-log",
}).Return(&deployment.Manifest{}, nil)
}).Return(nil, nil)
mockDeployer.On("Deploy", mock.Anything, &models.WorkerData{
Project: "project-1",
ModelSchemaSpec: schemaSpec,
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestDeploy(t *testing.T) {
ModelVersion: "1",
Revision: 1,
TopicSource: "caraml-project-1-model-1-1-prediction-log",
}).Return(&deployment.Manifest{}, nil)
}).Return(nil, nil)
mockDeployer.On("Deploy", mock.Anything, &models.WorkerData{
Project: "project-1",
ModelSchemaSpec: schemaSpec,
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestDeploy(t *testing.T) {
ModelVersion: "1",
Revision: 1,
TopicSource: "caraml-project-1-model-1-1-prediction-log",
}).Return(&deployment.Manifest{}, nil)
}).Return(nil, nil)
mockDeployer.On("Undeploy", mock.Anything, &models.WorkerData{
Project: "project-1",
ModelSchemaSpec: schemaSpec,
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestDeploy(t *testing.T) {
ModelVersion: "1",
Revision: 1,
TopicSource: "caraml-project-1-model-1-1-prediction-log",
}).Return(&deployment.Manifest{}, nil)
}).Return(nil, nil)
mockDeployer.On("Undeploy", mock.Anything, &models.WorkerData{
Project: "project-1",
ModelSchemaSpec: schemaSpec,
Expand Down
3 changes: 2 additions & 1 deletion db-migrations/39_observability_publisher.down.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP TABLE observability_publishers;
DROP TABLE observability_publishers;
DROP TYPE publisher_status;
2 changes: 1 addition & 1 deletion db-migrations/39_observability_publisher.up.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TYPE publisher_status as ENUM ('pending', 'running', 'failed', 'terminating');
CREATE TYPE publisher_status as ENUM ('pending', 'running', 'failed', 'terminated');

CREATE TABLE IF NOT EXISTS observability_publishers
(
Expand Down
40 changes: 32 additions & 8 deletions python/sdk/test/pyfunc_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

import os
import warnings
from functools import reduce
from test.utils import undeploy_all_version

import joblib
import numpy as np
import pytest
import xgboost as xgb
from merlin.model import ModelType, PyFuncModel, PyFuncV3Model
from merlin.model_schema import ModelSchema
from merlin.observability.inference import InferenceSchema, RegressionOutput, ValueType
from merlin.pyfunc import ModelInput, ModelOutput, Values
from merlin.resource_request import ResourceRequest
from sklearn import svm
Expand Down Expand Up @@ -63,12 +66,12 @@ def infer(self, model_input):
class ModelObservabilityModel(PyFuncV3Model):
def initialize(self, artifacts):
self._feature_names = [
"sepal length (cm)",
"sepal width (cm)",
"petal length (cm)",
"petal width (cm)",
"sepal_length",
"sepal_width",
"petal_length",
"petal_width",
]
self._target_names = ["setosa", "versicolor", "virginica"]
self._target_names = ["prediction_score"]
self._model = xgb.Booster(model_file=artifacts["xgb_model"])

def preprocess(self, request: dict, **kwargs) -> ModelInput:
Expand All @@ -81,9 +84,17 @@ def preprocess(self, request: dict, **kwargs) -> ModelInput:
def infer(self, model_input: ModelInput) -> ModelOutput:
dmatrix = xgb.DMatrix(model_input.features.data)
outputs = self._model.predict(dmatrix).tolist()

def max(first, second):
if first > second:
return first
return second

prediction_outputs = [[reduce(max, row)] for row in outputs]

return ModelOutput(
prediction_ids=model_input.prediction_ids,
predictions=Values(columns=self._target_names, data=outputs),
predictions=Values(columns=self._target_names, data=prediction_outputs),
)

def postprocess(self, model_output: ModelOutput, request: dict) -> dict:
Expand Down Expand Up @@ -234,13 +245,26 @@ def test_pyfunc_model_observability(
merlin.set_project(project_name)
merlin.set_model("pyfunc-mlobs", ModelType.PYFUNC_V3)

model_schema = ModelSchema(
spec=InferenceSchema(
feature_types={
"sepal_length": ValueType.FLOAT64,
"sepal_width": ValueType.FLOAT64,
"petal_length": ValueType.FLOAT64,
"petal_width": ValueType.FLOAT64,
},
model_prediction_output=RegressionOutput(
prediction_score_column="prediction_score", actual_score_column="actual"
),
)
)
undeploy_all_version()
with merlin.new_model_version() as v:
with merlin.new_model_version(model_schema=model_schema) as v:
iris = load_iris()
y = iris["target"]
X = iris["data"]
xgb_path = train_xgboost_model(X, y)

xgb_path = train_xgboost_model(X, y)
v.log_pyfunc_model(
model_instance=ModelObservabilityModel(),
conda_env="test/pyfunc/env.yaml",
Expand Down

0 comments on commit 5e84d6d

Please sign in to comment.