Skip to content

Commit

Permalink
feat: Add observability publisher deployment (#550)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
Add capability to deploy observability publisher (please suggest the
better name if you have any) that consume prediction log that is
produced by the model. This observability publisher will be deployed
once the model is serving or model redeployment if the model already
serve the traffic.

# Modifications
<!-- Summarize the key code changes. -->
* Adding event producer (`api/pkg/observability/event/event.go`) for
observability publisher. This event producer will produce deployment
event for observability publisher, which later on is consumed by the
deployment worker in merlin control plane to do actual deployment.
* Actual action for deployment
(`api/queue/work/observability_publisher_deployment.go`) that consume
event that is produced by the producer. There are several condition:
* Deployment is skip if the more latest revision is already queued
(revision in DB is greater than the one is observed by the
`observability_publisher_deployment`
* Deployment will be delayed by requeue if there is still ongoing
deployment for previous revision
* Deployment will be skip if there is ongoing deployment for newer
revision compare to current observed revision by the
`observability_publisher_deployment`
   
Once the conditions are met, it will deploy or undeploy k8s deployment
and secret manifest
* Add deployer (`api/pkg/observability/deployment/deployer.go`) to
interact with k8s control plane. There are 3 methods in this deployer
  * `Deploy` -> Create or update k8s secret and deployment manifest
  * `Undeploy` -> Delete k8s secret and deployment manifest
* `GetDeployedManifest` -> Get deployed manifest that contains k8s
secret and manifest, and status of deployment
* Adding new field in model `observability_supported` as a gate whether
the model is allowed to publish observability data. Observability
publisher only be deployed if the model `observability_supported` is
true and the model endpoint also enable model observability
# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [x] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
tiopramayudi authored Mar 15, 2024
1 parent 82fe2bf commit 444f9eb
Show file tree
Hide file tree
Showing 36 changed files with 4,982 additions and 94 deletions.
23 changes: 15 additions & 8 deletions api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
mlflow "github.com/caraml-dev/merlin/mlflow"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/gitlab"
"github.com/caraml-dev/merlin/pkg/observability/event"
"github.com/caraml-dev/merlin/queue"
"github.com/caraml-dev/merlin/queue/work"
"github.com/caraml-dev/merlin/service"
Expand Down Expand Up @@ -138,7 +139,7 @@ func main() {

dependencies := buildDependencies(ctx, cfg, db, dispatcher)

registerQueueJob(dispatcher, dependencies.modelDeployment, dependencies.batchDeployment)
registerQueueJob(dispatcher, dependencies.modelDeployment, dependencies.batchDeployment, dependencies.observabilityDeployment)
dispatcher.Start()

if err := initCronJob(dependencies, db); err != nil {
Expand Down Expand Up @@ -253,9 +254,10 @@ func newPprofRouter() *mux.Router {
return r
}

func registerQueueJob(consumer queue.Consumer, modelServiceDepl *work.ModelServiceDeployment, batchDepl *work.BatchDeployment) {
func registerQueueJob(consumer queue.Consumer, modelServiceDepl *work.ModelServiceDeployment, batchDepl *work.BatchDeployment, obsDepl *work.ObservabilityPublisherDeployment) {
consumer.RegisterJob(service.ModelServiceDeployment, modelServiceDepl.Deploy)
consumer.RegisterJob(service.BatchDeployment, batchDepl.Deploy)
consumer.RegisterJob(event.ObservabilityPublisherDeployment, obsDepl.Deploy)
}

func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dispatcher *queue.Dispatcher) deps {
Expand All @@ -268,10 +270,14 @@ func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dis

webServiceBuilder, predJobBuilder, imageBuilderJanitor := initImageBuilder(cfg)

observabilityPublisherStorage := storage.NewObservabilityPublisherStorage(db)
observabilityPublisherDeployment := initObservabilityPublisherDeployment(cfg, observabilityPublisherStorage)
versionStorage := storage.NewVersionStorage(db)
observabilityEvent := event.NewEventProducer(dispatcher, observabilityPublisherStorage, versionStorage)
clusterControllers := initClusterControllers(cfg)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db, observabilityEvent)
versionEndpointService := initVersionEndpointService(cfg, webServiceBuilder, clusterControllers, db, coreClient, dispatcher)
modelEndpointService := initModelEndpointService(cfg, db)
modelEndpointService := initModelEndpointService(cfg, db, observabilityEvent)

batchControllers := initBatchControllers(cfg, db, mlpAPIClient)
batchDeployment := initBatchDeployment(cfg, db, batchControllers, predJobBuilder)
Expand Down Expand Up @@ -356,9 +362,10 @@ func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dis
MlflowClient: mlflowClient,
}
return deps{
apiContext: apiContext,
modelDeployment: modelServiceDeployment,
batchDeployment: batchDeployment,
imageBuilderJanitor: imageBuilderJanitor,
apiContext: apiContext,
modelDeployment: modelServiceDeployment,
batchDeployment: batchDeployment,
observabilityDeployment: observabilityPublisherDeployment,
imageBuilderJanitor: imageBuilderJanitor,
}
}
70 changes: 58 additions & 12 deletions api/cmd/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/caraml-dev/merlin/mlp"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/imagebuilder"
"github.com/caraml-dev/merlin/pkg/observability/deployment"
"github.com/caraml-dev/merlin/pkg/observability/event"
"github.com/caraml-dev/merlin/queue"
"github.com/caraml-dev/merlin/queue/work"
"github.com/caraml-dev/merlin/service"
Expand All @@ -36,10 +38,11 @@ import (
)

type deps struct {
apiContext api.AppContext
modelDeployment *work.ModelServiceDeployment
batchDeployment *work.BatchDeployment
imageBuilderJanitor *imagebuilder.Janitor
apiContext api.AppContext
modelDeployment *work.ModelServiceDeployment
batchDeployment *work.BatchDeployment
observabilityDeployment *work.ObservabilityPublisherDeployment
imageBuilderJanitor *imagebuilder.Janitor
}

func initMLPAPIClient(ctx context.Context, cfg config.MlpAPIConfig) mlp.APIClient {
Expand Down Expand Up @@ -332,7 +335,7 @@ func initEnvironmentService(cfg *config.Config, db *gorm.DB) service.Environment
return svc
}

func initModelEndpointService(cfg *config.Config, db *gorm.DB) service.ModelEndpointsService {
func initModelEndpointService(cfg *config.Config, db *gorm.DB, observabilityEvent event.EventProducer) service.ModelEndpointsService {
istioClients := make(map[string]istio.Client)
for _, env := range cfg.ClusterConfig.EnvironmentConfigs {
creds := mlpcluster.NewK8sClusterCreds(env.K8sConfig)
Expand All @@ -348,7 +351,7 @@ func initModelEndpointService(cfg *config.Config, db *gorm.DB) service.ModelEndp
istioClients[env.Name] = istioClient
}

return service.NewModelEndpointsService(istioClients, storage.NewModelEndpointStorage(db), storage.NewVersionEndpointStorage(db), cfg.Environment)
return service.NewModelEndpointsService(istioClients, storage.NewModelEndpointStorage(db), storage.NewVersionEndpointStorage(db), cfg.Environment, observabilityEvent)
}

func initBatchDeployment(cfg *config.Config, db *gorm.DB, controllers map[string]batch.Controller, builder imagebuilder.ImageBuilder) *work.BatchDeployment {
Expand Down Expand Up @@ -415,13 +418,56 @@ func initPredictionJobService(cfg *config.Config, controllers map[string]batch.C
return service.NewPredictionJobService(controllers, builder, predictionJobStorage, clock.RealClock{}, cfg.Environment, producer)
}

func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB) *work.ModelServiceDeployment {
func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, observabilityEvent event.EventProducer) *work.ModelServiceDeployment {
return &work.ModelServiceDeployment{
ClusterControllers: controllers,
ImageBuilder: builder,
Storage: storage.NewVersionEndpointStorage(db),
DeploymentStorage: storage.NewDeploymentStorage(db),
LoggerDestinationURL: cfg.LoggerDestinationURL,
ClusterControllers: controllers,
ImageBuilder: builder,
Storage: storage.NewVersionEndpointStorage(db),
DeploymentStorage: storage.NewDeploymentStorage(db),
LoggerDestinationURL: cfg.LoggerDestinationURL,
ObservabilityEventProducer: observabilityEvent,
}
}

func initObservabilityPublisherDeployment(cfg *config.Config, observabilityPublisherStorage storage.ObservabilityPublisherStorage) *work.ObservabilityPublisherDeployment {
var envCfg *config.EnvironmentConfig
for _, env := range cfg.ClusterConfig.EnvironmentConfigs {
if env.Name == cfg.ObservabilityPublisher.EnvironmentName {
envCfg = env
break
}
}
if envCfg == nil {
log.Panicf("could not find destination environment for observability publisher")
}

clusterCfg := cluster.Config{
ClusterName: envCfg.Cluster,
GcpProject: envCfg.GcpProject,
}

var restConfig *rest.Config
var err error
if cfg.ClusterConfig.InClusterConfig {
restConfig, err = rest.InClusterConfig()
if err != nil {
log.Panicf("unable to get in cluster configs: %v", err)
}
} else {
creds := mlpcluster.NewK8sClusterCreds(envCfg.K8sConfig)
restConfig, err = creds.ToRestConfig()
if err != nil {
log.Panicf("unable to get cluster config of cluster: %s %v", clusterCfg.ClusterName, err)
}
}
deployer, err := deployment.New(restConfig, cfg.ObservabilityPublisher)
if err != nil {
log.Panicf("unable to initialize observability deployer with err: %w", err)
}

return &work.ObservabilityPublisherDeployment{
Deployer: deployer,
ObservabilityPublisherStorage: observabilityPublisherStorage,
}
}

Expand Down
1 change: 1 addition & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Config struct {
MlflowConfig MlflowConfig
PyFuncPublisherConfig PyFuncPublisherConfig
InferenceServiceDefaults InferenceServiceDefaults
ObservabilityPublisher ObservabilityPublisher
}

// UIConfig stores the configuration for the UI.
Expand Down
38 changes: 38 additions & 0 deletions api/config/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package config

import "time"

// ObservabilityPublisher
type ObservabilityPublisher struct {
ArizeSink ArizeSink
BigQuerySink BigQuerySink
KafkaConsumer KafkaConsumer
ImageName string
DefaultResources ResourceRequestsLimits
EnvironmentName string
Replicas int32
TargetNamespace string
ServiceAccountName string
DeploymentTimeout time.Duration `default:"30m"`
}

// KafkaConsumer
type KafkaConsumer struct {
Brokers string `validate:"required"`
BatchSize int
GroupID string
AdditionalConsumerConfig map[string]string
}

// ArizeSink
type ArizeSink struct {
APIKey string
SpaceKey string
}

// BigQuerySink
type BigQuerySink struct {
Project string
Dataset string
TTLDays int
}
15 changes: 8 additions & 7 deletions api/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ type CreatedUpdated struct {
}

type Model struct {
ID ID `json:"id"`
Name string `json:"name" validate:"required,min=3,max=25,subdomain_rfc1123"`
ProjectID ID `json:"project_id"`
Project mlp.Project `json:"-" gorm:"-"`
ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"`
Type string `json:"type" gorm:"type"`
MlflowURL string `json:"mlflow_url" gorm:"-"`
ID ID `json:"id"`
Name string `json:"name" validate:"required,min=3,max=25,subdomain_rfc1123"`
ProjectID ID `json:"project_id"`
Project mlp.Project `json:"-" gorm:"-"`
ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"`
Type string `json:"type" gorm:"type"`
MlflowURL string `json:"mlflow_url" gorm:"-"`
ObservabilitySupported bool `json:"observability_supported" gorm:"column:observability_supported"`

Endpoints []*ModelEndpoint `json:"endpoints" gorm:"foreignkey:ModelID;"`

Expand Down
8 changes: 8 additions & 0 deletions api/models/model_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type ModelEndpoint struct {
CreatedUpdated
}

func (me *ModelEndpoint) GetVersionEndpoint() *VersionEndpoint {
if me.Rule == nil || len(me.Rule.Destination) == 0 {
return nil
}
destination := me.Rule.Destination[0]
return destination.VersionEndpoint
}

// ModelEndpointRule describes model's endpoint traffic rule.
type ModelEndpointRule struct {
Destination []*ModelEndpointRuleDestination `json:"destinations"`
Expand Down
52 changes: 33 additions & 19 deletions api/models/model_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type ModelSchema struct {

// SchemaSpec
type SchemaSpec struct {
SessionIDColumn string `json:"session_id_column"`
RowIDColumn string `json:"row_id_column"`
ModelPredictionOutput *ModelPredictionOutput `json:"model_prediction_output"`
TagColumns []string `json:"tag_columns"`
FeatureTypes map[string]ValueType `json:"feature_types"`
FeatureOrders []string `json:"feature_orders"`
SessionIDColumn string `json:"session_id_column" yaml:"session_id_column"`
RowIDColumn string `json:"row_id_column" yaml:"row_id_column"`
ModelPredictionOutput *ModelPredictionOutput `json:"model_prediction_output" yaml:"model_prediction_output"`
TagColumns []string `json:"tag_columns" yaml:"tag_columns"`
FeatureTypes map[string]ValueType `json:"feature_types" yaml:"feature_types"`
FeatureOrders []string `json:"feature_orders" yaml:"feature_orders"`
}

// Value returning a value for `SchemaSpec` instance
Expand Down Expand Up @@ -125,27 +125,41 @@ func (m ModelPredictionOutput) MarshalJSON() ([]byte, error) {
return nil, nil
}

func (m ModelPredictionOutput) MarshalYAML() (interface{}, error) {
var in interface{}
if m.BinaryClassificationOutput != nil {
in = m.BinaryClassificationOutput
} else if m.RankingOutput != nil {
in = m.RankingOutput
} else if m.RegressionOutput != nil {
in = m.RegressionOutput
} else {
return nil, fmt.Errorf("not valid model prediction output")
}
return in, nil
}

// BinaryClassificationOutput is specification for prediction of binary classification model
type BinaryClassificationOutput struct {
ActualScoreColumn string `json:"actual_score_column"`
NegativeClassLabel string `json:"negative_class_label"`
PredictionScoreColumn string `json:"prediction_score_column"`
PredictionLabelColumn string `json:"prediction_label_column"`
PositiveClassLabel string `json:"positive_class_label"`
ScoreThreshold *float64 `json:"score_threshold,omitempty"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
ActualScoreColumn string `json:"actual_score_column" yaml:"actual_score_column"`
NegativeClassLabel string `json:"negative_class_label" yaml:"negative_class_label"`
PredictionScoreColumn string `json:"prediction_score_column" yaml:"prediction_score_column"`
PredictionLabelColumn string `json:"prediction_label_column" yaml:"prediction_label_column"`
PositiveClassLabel string `json:"positive_class_label" yaml:"positive_class_label"`
ScoreThreshold *float64 `json:"score_threshold,omitempty" yaml:"score_threshold"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}

// RankingOutput is specification for prediction of ranking model
type RankingOutput struct {
RankScoreColumn string `json:"rank_score_column"`
RelevanceScoreColumn string `json:"relevance_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
RankScoreColumn string `json:"rank_score_column" yaml:"rank_score_column"`
RelevanceScoreColumn string `json:"relevance_score_column" yaml:"relevance_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}

// Regression is specification for prediction of regression model
type RegressionOutput struct {
PredictionScoreColumn string `json:"prediction_score_column"`
ActualScoreColumn string `json:"actual_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
PredictionScoreColumn string `json:"prediction_score_column" yaml:"prediction_score_column"`
ActualScoreColumn string `json:"actual_score_column" yaml:"actual_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}
67 changes: 67 additions & 0 deletions api/models/observability_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package models

import (
"fmt"
)

// PublisherStatus
type PublisherStatus string

const (
Pending PublisherStatus = "pending"
Running PublisherStatus = "running"
Failed PublisherStatus = "failed"
Terminated PublisherStatus = "terminated"
)

// ObservabilityPublisher
type ObservabilityPublisher struct {
ID ID `gorm:"id"`
VersionModelID ID `gorm:"version_model_id"`
VersionID ID `gorm:"version_id"`
Revision int `gorm:"revision"`
Status PublisherStatus `gorm:"status"`
ModelSchemaSpec *SchemaSpec `gorm:"model_schema_spec"`
CreatedUpdated
}

type ActionType string

const (
DeployPublisher ActionType = "deploy"
UndeployPublisher ActionType = "delete"
)

type WorkerData struct {
Project string
ModelSchemaSpec *SchemaSpec
Metadata Metadata
ModelName string
ModelVersion string
Revision int
TopicSource string
}

func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher) *WorkerData {
return &WorkerData{
ModelName: model.Name,
Project: model.Project.Name,
ModelSchemaSpec: observabilityPublisher.ModelSchemaSpec,
Metadata: Metadata{
App: fmt.Sprintf("%s-observability-publisher", model.Name),
Component: "worker",
Stream: model.Project.Stream,
Team: model.Project.Team,
Labels: model.Project.Labels,
},
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
}
}

type ObservabilityPublisherJob struct {
ActionType ActionType
Publisher *ObservabilityPublisher
WorkerData *WorkerData
}
Loading

0 comments on commit 444f9eb

Please sign in to comment.