Skip to content

Commit

Permalink
feat: Add more granular config for model observability during deploym…
Browse files Browse the repository at this point in the history
…ent (#619)

<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
Adding `model_observability` configuration that contains:
* Enable -> Whether the model observability is enabled or not
* Ground Truth Source -> The source for ground truth which later on will
be use to ingest ground truth
* Ground Truth Job -> The Job configuration to ingest ground truth
* Prediction Log Ingestion Resource -> Resource configuration for
observability publisher worker, currently we use default value that is
set on platform level
# Modifications
<!-- Summarize the key code changes. -->
* Modify swagger
* Regenerate python client from modified swagger spec
* Modify model_observability model
* Update deployment of observability publisher worker
* Update merlin sdk so user can supply `model_observability` config
during deployment

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [x] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [x] Update Swagger spec if the PR introduce API changes
- [x] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
tiopramayudi authored Dec 5, 2024
1 parent 2b0b062 commit 95d2e48
Show file tree
Hide file tree
Showing 34 changed files with 1,349 additions and 56 deletions.
9 changes: 8 additions & 1 deletion api/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,16 @@ func deploymentModeValidation(prev *models.VersionEndpoint, new *models.VersionE

func modelObservabilityValidation(endpoint *models.VersionEndpoint, model *models.Model) requestValidator {
return newFuncValidate(func() error {
if endpoint.EnableModelObservability && !slices.Contains(supportedObservabilityModelTypes, model.Type) {
if !endpoint.IsModelMonitoringEnabled() {
return nil
}
if !slices.Contains(supportedObservabilityModelTypes, model.Type) {
return fmt.Errorf("%s: %w", model.Type, ErrUnsupportedObservabilityModelType)
}

if !model.ObservabilitySupported {
return fmt.Errorf("model observability is not supported for this model")
}
return nil
})
}
150 changes: 126 additions & 24 deletions api/api/version_endpoints_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,14 +1073,15 @@ func TestCreateEndpoint(t *testing.T) {
modelService: func() *mocks.ModelsService {
svc := &mocks.ModelsService{}
svc.On("FindByID", mock.Anything, models.ID(1)).Return(&models.Model{
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
ObservabilitySupported: true,
}, nil)
return svc
},
Expand Down Expand Up @@ -1199,6 +1200,105 @@ func TestCreateEndpoint(t *testing.T) {
},
},
},
{
desc: "Fail when try to enable model observability but the model is not supported yet",
vars: map[string]string{
"model_id": "1",
"version_id": "1",
},
requestBody: &models.VersionEndpoint{
ID: uuid,
VersionID: models.ID(1),
VersionModelID: models.ID(1),
ServiceName: "sample",
Namespace: "sample",
EnvironmentName: "dev",
Message: "",
ResourceRequest: &models.ResourceRequest{
MinReplica: 1,
MaxReplica: 4,
CPURequest: resource.MustParse("1"),
MemoryRequest: resource.MustParse("1Gi"),
},
EnvVars: models.EnvVars([]models.EnvVar{
{
Name: "WORKER",
Value: "1",
},
}),
EnableModelObservability: true,
},
modelService: func() *mocks.ModelsService {
svc := &mocks.ModelsService{}
svc.On("FindByID", mock.Anything, models.ID(1)).Return(&models.Model{
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
ObservabilitySupported: false,
}, nil)
return svc
},
versionService: func() *mocks.VersionsService {
svc := &mocks.VersionsService{}
svc.On("FindByID", mock.Anything, models.ID(1), models.ID(1), mock.Anything).Return(&models.Version{
ID: models.ID(1),
ModelID: models.ID(1),
Model: &models.Model{
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
},
}, nil)
return svc
},
envService: func() *mocks.EnvironmentService {
svc := &mocks.EnvironmentService{}
svc.On("GetDefaultEnvironment").Return(&models.Environment{
ID: models.ID(1),
Name: "dev",
Cluster: "dev",
IsDefault: &trueBoolean,
Region: "id",
GcpProject: "dev-proj",
MaxCPU: "1",
MaxMemory: "1Gi",
}, nil)
svc.On("GetEnvironment", "dev").Return(&models.Environment{
ID: models.ID(1),
Name: "dev",
Cluster: "dev",
IsDefault: &trueBoolean,
Region: "id",
GcpProject: "dev-proj",
MaxCPU: "1",
MaxMemory: "1Gi",
}, nil)
return svc
},
endpointService: func() *mocks.EndpointsService {
svc := &mocks.EndpointsService{}
svc.On("CountEndpoints", context.Background(), mock.Anything, mock.Anything).Return(0, nil)
return svc
},
monitoringConfig: config.MonitoringConfig{},
feastCoreMock: func() *feastmocks.CoreServiceClient {
return &feastmocks.CoreServiceClient{}
},
expected: &Response{
code: http.StatusBadRequest,
data: Error{Message: "Request validation failed: model observability is not supported for this model"},
},
},
{
desc: "Should return 400 if UPI is not supported",
vars: map[string]string{
Expand Down Expand Up @@ -3909,14 +4009,15 @@ func TestUpdateEndpoint(t *testing.T) {
modelService: func() *mocks.ModelsService {
svc := &mocks.ModelsService{}
svc.On("FindByID", context.Background(), models.ID(1)).Return(&models.Model{
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "pyfunc",
MlflowURL: "",
Endpoints: nil,
ObservabilitySupported: true,
}, nil)
return svc
},
Expand Down Expand Up @@ -4600,14 +4701,15 @@ func TestUpdateEndpoint(t *testing.T) {
modelService: func() *mocks.ModelsService {
svc := &mocks.ModelsService{}
svc.On("FindByID", context.Background(), models.ID(1)).Return(&models.Model{
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "tensorflow",
MlflowURL: "",
Endpoints: nil,
ID: models.ID(1),
Name: "model-1",
ProjectID: models.ID(1),
Project: mlp.Project{},
ExperimentID: 1,
Type: "tensorflow",
MlflowURL: "",
Endpoints: nil,
ObservabilitySupported: true,
}, nil)
return svc
},
Expand Down
6 changes: 3 additions & 3 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
github.com/coocood/freecache v1.2.3
github.com/fatih/color v1.15.0
github.com/fatih/color v1.18.0
github.com/feast-dev/feast/sdk/go v0.9.4
github.com/fraugster/parquet-go v0.10.0
github.com/ghodss/yaml v1.0.0
Expand Down Expand Up @@ -182,7 +182,7 @@ require (
github.com/magiconair/properties v1.8.5 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down Expand Up @@ -228,7 +228,7 @@ require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/feast-dev/feast/sdk/go v0.9.4 h1:ChGdqbNiaBbcm/X1TRBkikAnbhJFAp0ofzs1TmBA/M4=
github.com/feast-dev/feast/sdk/go v0.9.4/go.mod h1:RWG8U+ri5d9CEE6jGPwfaIr5TxNGisZvlCMwGYfmGn4=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
Expand Down Expand Up @@ -800,8 +800,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
Expand Down Expand Up @@ -1406,8 +1406,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
57 changes: 57 additions & 0 deletions api/models/model_observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package models

import (
"database/sql/driver"
"encoding/json"
"errors"

"k8s.io/apimachinery/pkg/api/resource"
)

// ModelObservability represents the configuration for model observability features.
type ModelObservability struct {
Enabled bool `json:"enabled"`
GroundTruthSource *GroundTruthSource `json:"ground_truth_source"`
GroundTruthJob *GroundTruthJob `json:"ground_truth_job"`
PredictionLogIngestionResourceRequest *WorkerResourceRequest `json:"prediction_log_ingestion_resource_request"`
}

// GroundTruthSource represents the source configuration for ground truth data.
type GroundTruthSource struct {
TableURN string `json:"table_urn"`
EventTimestampColumn string `json:"event_timestamp_column"`
SourceProject string `json:"source_project"`
}

// GroundTruthJob represents the configuration for a scheduled job.
type GroundTruthJob struct {
CronSchedule string `json:"cron_schedule"`
CPURequest string `json:"cpu_request"`
CPULimit *string `json:"cpu_limit"`
MemoryRequest string `json:"memory_request"`
MemoryLimit *string `json:"memory_limit"`
StartDayOffsetFromNow int `json:"start_day_offset_from_now"`
EndDayOffsetFromNow int `json:"end_day_offset_from_now"`
GracePeriodDay int `json:"grace_period_day"`
ServiceAccountSecretName string `json:"service_account_secret_name"`
}

// WorkerResourceRequest represents the resource request for a worker (prediction log ingestion kafka consumer) deployment.
type WorkerResourceRequest struct {
CPURequest *resource.Quantity `json:"cpu_request"`
MemoryRequest *resource.Quantity `json:"memory_request"`
Replica int32 `json:"replica"`
}

func (mlob ModelObservability) Value() (driver.Value, error) {
return json.Marshal(mlob)
}

func (mlob *ModelObservability) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}

return json.Unmarshal(b, &mlob)
}
10 changes: 6 additions & 4 deletions api/models/observability_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type WorkerData struct {
ModelVersion string
Revision int
TopicSource string
ResourceRequest *WorkerResourceRequest
}

func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher) *WorkerData {
func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher, resourceRequest *WorkerResourceRequest) *WorkerData {
return &WorkerData{
ModelName: model.Name,
Project: model.Project.Name,
Expand All @@ -54,9 +55,10 @@ func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *
Team: model.Project.Team,
Labels: model.Project.Labels,
},
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
ResourceRequest: resourceRequest,
}
}

Expand Down
2 changes: 1 addition & 1 deletion api/models/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewService(model *Model, version *Version, modelOpt *ModelOption, endpoint
AutoscalingPolicy: endpoint.AutoscalingPolicy,
Protocol: endpoint.Protocol,
CurrentIsvcName: endpoint.InferenceServiceName,
EnabledModelObservability: endpoint.EnableModelObservability,
EnabledModelObservability: endpoint.IsModelMonitoringEnabled(),
ModelSchema: version.ModelSchema,
PredictorUPIOverHTTPEnabled: predictorUPIOverHTTPEnabled(endpoint.Transformer, endpoint.Protocol),
}
Expand Down
15 changes: 13 additions & 2 deletions api/models/version_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ type VersionEndpoint struct {
// AutoscalingPolicy controls the conditions when autoscaling should be triggered
AutoscalingPolicy *autoscaling.AutoscalingPolicy `json:"autoscaling_policy" gorm:"autoscaling_policy"`
// Protocol to be used when deploying the model
Protocol protocol.Protocol `json:"protocol" gorm:"protocol"`
EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"`
Protocol protocol.Protocol `json:"protocol" gorm:"protocol"`
// EnableModelObservability flag indicate whether the version endpoint should enable model observability
// This flag will be deprecated in the future, please use ModelObservability.Enabled instead
EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"`
// ModelObservability configuration
ModelObservability *ModelObservability `json:"model_observability" gorm:"model_observability"`
CreatedUpdated
}

Expand Down Expand Up @@ -127,6 +131,13 @@ func (ve *VersionEndpoint) IsServing() bool {
return ve.Status == EndpointServing
}

func (ve *VersionEndpoint) IsModelMonitoringEnabled() bool {
if ve.ModelObservability == nil {
return ve.EnableModelObservability
}
return ve.ModelObservability.Enabled
}

func (ve *VersionEndpoint) Hostname() string {
if ve.URL == "" {
return ""
Expand Down
Loading

0 comments on commit 95d2e48

Please sign in to comment.