Skip to content

Commit

Permalink
Add more granular config for model observability during deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
tiopramayudi committed Nov 22, 2024
1 parent 0877a4f commit a5a3e0d
Show file tree
Hide file tree
Showing 32 changed files with 1,206 additions and 31 deletions.
2 changes: 1 addition & 1 deletion api/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func deploymentModeValidation(prev *models.VersionEndpoint, new *models.VersionE

func modelObservabilityValidation(endpoint *models.VersionEndpoint, model *models.Model) requestValidator {
return newFuncValidate(func() error {
if endpoint.EnableModelObservability && !slices.Contains(supportedObservabilityModelTypes, model.Type) {
if endpoint.IsModelMonitoringEnabled() && !slices.Contains(supportedObservabilityModelTypes, model.Type) {
return fmt.Errorf("%s: %w", model.Type, ErrUnsupportedObservabilityModelType)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
github.com/coocood/freecache v1.2.3
github.com/fatih/color v1.15.0
github.com/fatih/color v1.18.0
github.com/feast-dev/feast/sdk/go v0.9.4
github.com/fraugster/parquet-go v0.10.0
github.com/ghodss/yaml v1.0.0
Expand Down Expand Up @@ -183,7 +183,7 @@ require (
github.com/magiconair/properties v1.8.5 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down Expand Up @@ -229,7 +229,7 @@ require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/feast-dev/feast/sdk/go v0.9.4 h1:ChGdqbNiaBbcm/X1TRBkikAnbhJFAp0ofzs1TmBA/M4=
github.com/feast-dev/feast/sdk/go v0.9.4/go.mod h1:RWG8U+ri5d9CEE6jGPwfaIr5TxNGisZvlCMwGYfmGn4=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
Expand Down Expand Up @@ -759,8 +759,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
Expand Down Expand Up @@ -1365,8 +1365,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
57 changes: 57 additions & 0 deletions api/models/model_observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package models

import (
"database/sql/driver"
"encoding/json"
"errors"

"k8s.io/apimachinery/pkg/api/resource"
)

// ModelObservability represents the configuration for model observability features.
type ModelObservability struct {
Enabled bool `json:"enabled"`
GroundTruthSource *GroundTruthSource `json:"ground_truth_source"`
GroundTruthJob *GroundTruthJob `json:"ground_truth_job"`
PredictionLogIngestionResourceRequest *WorkerResourceRequest `json:"prediction_log_ingestion_resource_request"`
}

// GroundTruthSource represents the source configuration for ground truth data.
type GroundTruthSource struct {
TableURN string `json:"table_urn"`
EventTimestampColumn string `json:"event_timestamp_column"`
DWHProject string `json:"dwh_project"`
}

// GroundTruthJob represents the configuration for a scheduled job.
type GroundTruthJob struct {
CronSchedule string `json:"cron_schedule"`
CPURequest string `json:"cpu_request"`
CPULimit *string `json:"cpu_limit"`
MemoryRequest string `json:"memory_request"`
MemoryLimit *string `json:"memory_limit"`
StartDayOffsetFromNow int `json:"start_day_offset_from_now"`
EndDayOffsetFromNow int `json:"end_day_offset_from_now"`
GracePeriodDay int `json:"grace_period_day"`
ServiceAccountSecretName string `json:"service_account_secret_name"`
}

// WorkerResourceRequest represents the resource request for a worker (prediction log ingestion kafka consumer) deployment.
type WorkerResourceRequest struct {
CPURequest *resource.Quantity `json:"cpu_request"`
MemoryRequest *resource.Quantity `json:"memory_request"`
Replica int32 `json:"replica"`
}

func (mlob ModelObservability) Value() (driver.Value, error) {
return json.Marshal(mlob)
}

func (mlob *ModelObservability) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}

return json.Unmarshal(b, &mlob)
}
10 changes: 6 additions & 4 deletions api/models/observability_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type WorkerData struct {
ModelVersion string
Revision int
TopicSource string
ResourceRequest *WorkerResourceRequest
}

func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher) *WorkerData {
func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher, resourceRequest *WorkerResourceRequest) *WorkerData {
return &WorkerData{
ModelName: model.Name,
Project: model.Project.Name,
Expand All @@ -54,9 +55,10 @@ func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *
Team: model.Project.Team,
Labels: model.Project.Labels,
},
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
ResourceRequest: resourceRequest,
}
}

Expand Down
15 changes: 13 additions & 2 deletions api/models/version_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ type VersionEndpoint struct {
// AutoscalingPolicy controls the conditions when autoscaling should be triggered
AutoscalingPolicy *autoscaling.AutoscalingPolicy `json:"autoscaling_policy" gorm:"autoscaling_policy"`
// Protocol to be used when deploying the model
Protocol protocol.Protocol `json:"protocol" gorm:"protocol"`
EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"`
Protocol protocol.Protocol `json:"protocol" gorm:"protocol"`
// EnableModelObservability flag indicate whether the version endpoint should enable model observability
// This flag will be deprecated in the future, please use ModelObservability.Enabled instead
EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"`
// ModelObservability configuration
ModelObservability *ModelObservability `json:"model_observability" gorm:"model_observability"`
CreatedUpdated
}

Expand Down Expand Up @@ -127,6 +131,13 @@ func (ve *VersionEndpoint) IsServing() bool {
return ve.Status == EndpointServing
}

func (ve *VersionEndpoint) IsModelMonitoringEnabled() bool {
if ve.ModelObservability == nil {
return ve.EnableModelObservability
}
return ve.ModelObservability.Enabled
}

func (ve *VersionEndpoint) Hostname() string {
if ve.URL == "" {
return ""
Expand Down
53 changes: 53 additions & 0 deletions api/models/version_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,56 @@ func TestVersionEndpoint_Path(t *testing.T) {
})
}
}

func TestVersionEndpoint_IsModelMonitoringEnabled(t *testing.T) {
tests := []struct {
name string
versionEndpoint *VersionEndpoint
want bool
}{
{
name: "model observability is nil but enable model observability is true",
versionEndpoint: &VersionEndpoint{
ModelObservability: nil,
EnableModelObservability: true,
},
want: true,
},
{
name: "model observability is nil and enable model observability is false",
versionEndpoint: &VersionEndpoint{
ModelObservability: nil,
EnableModelObservability: false,
},
want: false,
},
{
name: "model observability is not nil and enabled is true",
versionEndpoint: &VersionEndpoint{
ModelObservability: &ModelObservability{
Enabled: true,
},
EnableModelObservability: true,
},
want: true,
},
{
name: "model observability is not nil and enabled is false",
versionEndpoint: &VersionEndpoint{
ModelObservability: &ModelObservability{
Enabled: false,
},
EnableModelObservability: false,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

if got := tt.versionEndpoint.IsModelMonitoringEnabled(); got != tt.want {
t.Errorf("VersionEndpoint.IsModelMonitoringEnabled() = %v, want %v", got, tt.want)
}
})
}
}
30 changes: 27 additions & 3 deletions api/pkg/observability/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,30 @@ func (c *deployer) getLabels(data *models.WorkerData) map[string]string {
return labels
}

func (c *deployer) getResources(data *models.WorkerData) (corev1.ResourceList, corev1.ResourceList) {
requests := c.resourceRequest.DeepCopy()
limits := c.resourceLimit.DeepCopy()
if data.ResourceRequest != nil {
if data.ResourceRequest.CPURequest != nil {
requests[corev1.ResourceCPU] = *data.ResourceRequest.CPURequest
// remove default limits
delete(limits, corev1.ResourceCPU)
}
if memoryReq := data.ResourceRequest.MemoryRequest; memoryReq != nil {
requests[corev1.ResourceMemory] = *memoryReq
limits[corev1.ResourceMemory] = *memoryReq
}
}
return requests, limits
}

func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName string) (*appsv1.Deployment, error) {
labels := c.getLabels(data)

cfgVolName := "config-volume"
workerContainer := "worker"

requestsResources, limitsResources := c.getResources(data)
podSpec := corev1.PodSpec{
Containers: []corev1.Container{
{
Expand All @@ -361,8 +380,8 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri
ImagePullPolicy: corev1.PullIfNotPresent,

Resources: corev1.ResourceRequirements{
Requests: c.resourceRequest,
Limits: c.resourceLimit,
Requests: requestsResources,
Limits: limitsResources,
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -392,6 +411,11 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri
},
}
podSpecWithIdentity := enrichIdentityToPod(podSpec, c.consumerConfig.ServiceAccountSecretName, []string{workerContainer})
numReplicas := c.consumerConfig.Replicas
if data.ResourceRequest != nil && data.ResourceRequest.Replica > 0 {
numReplicas = data.ResourceRequest.Replica
}

return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: c.getDeploymentName(data),
Expand All @@ -407,7 +431,7 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri
appLabelKey: data.Metadata.App,
},
},
Replicas: &c.consumerConfig.Replicas,
Replicas: &numReplicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Expand Down
Loading

0 comments on commit a5a3e0d

Please sign in to comment.