Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add observability publisher deployment #550

Merged
merged 12 commits into from
Mar 15, 2024
23 changes: 15 additions & 8 deletions api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
mlflow "github.com/caraml-dev/merlin/mlflow"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/gitlab"
"github.com/caraml-dev/merlin/pkg/observability/event"
"github.com/caraml-dev/merlin/queue"
"github.com/caraml-dev/merlin/queue/work"
"github.com/caraml-dev/merlin/service"
Expand Down Expand Up @@ -138,7 +139,7 @@ func main() {

dependencies := buildDependencies(ctx, cfg, db, dispatcher)

registerQueueJob(dispatcher, dependencies.modelDeployment, dependencies.batchDeployment)
registerQueueJob(dispatcher, dependencies.modelDeployment, dependencies.batchDeployment, dependencies.observabilityDeployment)
dispatcher.Start()

if err := initCronJob(dependencies, db); err != nil {
Expand Down Expand Up @@ -253,9 +254,10 @@ func newPprofRouter() *mux.Router {
return r
}

func registerQueueJob(consumer queue.Consumer, modelServiceDepl *work.ModelServiceDeployment, batchDepl *work.BatchDeployment) {
func registerQueueJob(consumer queue.Consumer, modelServiceDepl *work.ModelServiceDeployment, batchDepl *work.BatchDeployment, obsDepl *work.ObservabilityPublisherDeployment) {
consumer.RegisterJob(service.ModelServiceDeployment, modelServiceDepl.Deploy)
consumer.RegisterJob(service.BatchDeployment, batchDepl.Deploy)
consumer.RegisterJob(event.ObservabilityPublisherDeployment, obsDepl.Deploy)
}

func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dispatcher *queue.Dispatcher) deps {
Expand All @@ -268,10 +270,14 @@ func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dis

webServiceBuilder, predJobBuilder, imageBuilderJanitor := initImageBuilder(cfg)

observabilityPublisherStorage := storage.NewObservabilityPublisherStorage(db)
observabilityPublisherDeployment := initObservabilityPublisherDeployment(cfg, observabilityPublisherStorage)
versionStorage := storage.NewVersionStorage(db)
observabilityEvent := event.NewEventProducer(dispatcher, observabilityPublisherStorage, versionStorage)
clusterControllers := initClusterControllers(cfg)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db, observabilityEvent)
versionEndpointService := initVersionEndpointService(cfg, webServiceBuilder, clusterControllers, db, coreClient, dispatcher)
modelEndpointService := initModelEndpointService(cfg, db)
modelEndpointService := initModelEndpointService(cfg, db, observabilityEvent)

batchControllers := initBatchControllers(cfg, db, mlpAPIClient)
batchDeployment := initBatchDeployment(cfg, db, batchControllers, predJobBuilder)
Expand Down Expand Up @@ -356,9 +362,10 @@ func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dis
MlflowClient: mlflowClient,
}
return deps{
apiContext: apiContext,
modelDeployment: modelServiceDeployment,
batchDeployment: batchDeployment,
imageBuilderJanitor: imageBuilderJanitor,
apiContext: apiContext,
modelDeployment: modelServiceDeployment,
batchDeployment: batchDeployment,
observabilityDeployment: observabilityPublisherDeployment,
imageBuilderJanitor: imageBuilderJanitor,
}
}
70 changes: 58 additions & 12 deletions api/cmd/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/caraml-dev/merlin/mlp"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/imagebuilder"
"github.com/caraml-dev/merlin/pkg/observability/deployment"
"github.com/caraml-dev/merlin/pkg/observability/event"
"github.com/caraml-dev/merlin/queue"
"github.com/caraml-dev/merlin/queue/work"
"github.com/caraml-dev/merlin/service"
Expand All @@ -36,10 +38,11 @@ import (
)

type deps struct {
apiContext api.AppContext
modelDeployment *work.ModelServiceDeployment
batchDeployment *work.BatchDeployment
imageBuilderJanitor *imagebuilder.Janitor
apiContext api.AppContext
modelDeployment *work.ModelServiceDeployment
batchDeployment *work.BatchDeployment
observabilityDeployment *work.ObservabilityPublisherDeployment
imageBuilderJanitor *imagebuilder.Janitor
}

func initMLPAPIClient(ctx context.Context, cfg config.MlpAPIConfig) mlp.APIClient {
Expand Down Expand Up @@ -332,7 +335,7 @@ func initEnvironmentService(cfg *config.Config, db *gorm.DB) service.Environment
return svc
}

func initModelEndpointService(cfg *config.Config, db *gorm.DB) service.ModelEndpointsService {
func initModelEndpointService(cfg *config.Config, db *gorm.DB, observabilityEvent event.EventProducer) service.ModelEndpointsService {
istioClients := make(map[string]istio.Client)
for _, env := range cfg.ClusterConfig.EnvironmentConfigs {
creds := mlpcluster.NewK8sClusterCreds(env.K8sConfig)
Expand All @@ -348,7 +351,7 @@ func initModelEndpointService(cfg *config.Config, db *gorm.DB) service.ModelEndp
istioClients[env.Name] = istioClient
}

return service.NewModelEndpointsService(istioClients, storage.NewModelEndpointStorage(db), storage.NewVersionEndpointStorage(db), cfg.Environment)
return service.NewModelEndpointsService(istioClients, storage.NewModelEndpointStorage(db), storage.NewVersionEndpointStorage(db), cfg.Environment, observabilityEvent)
}

func initBatchDeployment(cfg *config.Config, db *gorm.DB, controllers map[string]batch.Controller, builder imagebuilder.ImageBuilder) *work.BatchDeployment {
Expand Down Expand Up @@ -415,13 +418,56 @@ func initPredictionJobService(cfg *config.Config, controllers map[string]batch.C
return service.NewPredictionJobService(controllers, builder, predictionJobStorage, clock.RealClock{}, cfg.Environment, producer)
}

func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB) *work.ModelServiceDeployment {
func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, observabilityEvent event.EventProducer) *work.ModelServiceDeployment {
return &work.ModelServiceDeployment{
ClusterControllers: controllers,
ImageBuilder: builder,
Storage: storage.NewVersionEndpointStorage(db),
DeploymentStorage: storage.NewDeploymentStorage(db),
LoggerDestinationURL: cfg.LoggerDestinationURL,
ClusterControllers: controllers,
ImageBuilder: builder,
Storage: storage.NewVersionEndpointStorage(db),
DeploymentStorage: storage.NewDeploymentStorage(db),
LoggerDestinationURL: cfg.LoggerDestinationURL,
ObservabilityEventProducer: observabilityEvent,
}
}

func initObservabilityPublisherDeployment(cfg *config.Config, observabilityPublisherStorage storage.ObservabilityPublisherStorage) *work.ObservabilityPublisherDeployment {
var envCfg *config.EnvironmentConfig
for _, env := range cfg.ClusterConfig.EnvironmentConfigs {
if env.Name == cfg.Environment {
envCfg = env
break
}
}
if envCfg == nil {
log.Panicf("could not find destination environment for observability publisher")
}

clusterCfg := cluster.Config{
ClusterName: envCfg.Cluster,
GcpProject: envCfg.GcpProject,
}

var restConfig *rest.Config
var err error
if cfg.ClusterConfig.InClusterConfig {
restConfig, err = rest.InClusterConfig()
if err != nil {
log.Panicf("unable to get in cluster configs: %v", err)
}
} else {
creds := mlpcluster.NewK8sClusterCreds(envCfg.K8sConfig)
restConfig, err = creds.ToRestConfig()
if err != nil {
log.Panicf("unable to get cluster config of cluster: %s %v", clusterCfg.ClusterName, err)
}
}
deployer, err := deployment.New(restConfig, cfg.ObservabilityPublisher)
if err != nil {
log.Panicf("unable to initialize observability deployer with err: %w", err)
}

return &work.ObservabilityPublisherDeployment{
Deployer: deployer,
ObservabilityPublisherStorage: observabilityPublisherStorage,
}
}

Expand Down
1 change: 1 addition & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Config struct {
MlflowConfig MlflowConfig
PyFuncPublisherConfig PyFuncPublisherConfig
InferenceServiceDefaults InferenceServiceDefaults
ObservabilityPublisher ObservabilityPublisher
}

// UIConfig stores the configuration for the UI.
Expand Down
34 changes: 34 additions & 0 deletions api/config/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

// ObservabilityPublisher
type ObservabilityPublisher struct {
ArizeSink ArizeSink
BigQuerySink BigQuerySink
BatchSize int
KafkaConsumer KafkaConsumer
ImageName string
DefaultResources ResourceRequestsLimits
EnvironmentName string
Replicas int32
}

// KafkaConsumer
type KafkaConsumer struct {
Brokers string `validate:"required"`
BatchSize int
GroupID string
AdditionalConsumerConfig map[string]string
}

// ArizeSink
type ArizeSink struct {
APIKey string
SpaceKey string
}

// BigQuerySink
type BigQuerySink struct {
Project string
Dataset string
TTLDays int
}
15 changes: 8 additions & 7 deletions api/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ type CreatedUpdated struct {
}

type Model struct {
ID ID `json:"id"`
Name string `json:"name" validate:"required,min=3,max=25,subdomain_rfc1123"`
ProjectID ID `json:"project_id"`
Project mlp.Project `json:"-" gorm:"-"`
ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"`
Type string `json:"type" gorm:"type"`
MlflowURL string `json:"mlflow_url" gorm:"-"`
ID ID `json:"id"`
Name string `json:"name" validate:"required,min=3,max=25,subdomain_rfc1123"`
ProjectID ID `json:"project_id"`
Project mlp.Project `json:"-" gorm:"-"`
ExperimentID ID `json:"mlflow_experiment_id" gorm:"column:mlflow_experiment_id"`
Type string `json:"type" gorm:"type"`
MlflowURL string `json:"mlflow_url" gorm:"-"`
ObservabilitySupported bool `json:"-" gorm:"column:observability_supported"`

Endpoints []*ModelEndpoint `json:"endpoints" gorm:"foreignkey:ModelID;"`

Expand Down
8 changes: 8 additions & 0 deletions api/models/model_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type ModelEndpoint struct {
CreatedUpdated
}

func (me *ModelEndpoint) GetVersionEndpoint() *VersionEndpoint {
if me.Rule == nil || len(me.Rule.Destination) == 0 {
return nil
}
destination := me.Rule.Destination[0]
return destination.VersionEndpoint
}

// ModelEndpointRule describes model's endpoint traffic rule.
type ModelEndpointRule struct {
Destination []*ModelEndpointRuleDestination `json:"destinations"`
Expand Down
52 changes: 33 additions & 19 deletions api/models/model_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type ModelSchema struct {

// SchemaSpec
type SchemaSpec struct {
SessionIDColumn string `json:"session_id_column"`
RowIDColumn string `json:"row_id_column"`
ModelPredictionOutput *ModelPredictionOutput `json:"model_prediction_output"`
TagColumns []string `json:"tag_columns"`
FeatureTypes map[string]ValueType `json:"feature_types"`
FeatureOrders []string `json:"feature_orders"`
SessionIDColumn string `json:"session_id_column" yaml:"session_id_column"`
RowIDColumn string `json:"row_id_column" yaml:"row_id_column"`
ModelPredictionOutput *ModelPredictionOutput `json:"model_prediction_output" yaml:"model_prediction_output"`
TagColumns []string `json:"tag_columns" yaml:"tag_columns"`
FeatureTypes map[string]ValueType `json:"feature_types" yaml:"feature_types"`
FeatureOrders []string `json:"feature_orders" yaml:"feature_orders"`
}

// Value returning a value for `SchemaSpec` instance
Expand Down Expand Up @@ -125,27 +125,41 @@ func (m ModelPredictionOutput) MarshalJSON() ([]byte, error) {
return nil, nil
}

func (m ModelPredictionOutput) MarshalYAML() (interface{}, error) {
var in interface{}
if m.BinaryClassificationOutput != nil {
in = m.BinaryClassificationOutput
} else if m.RankingOutput != nil {
in = m.RankingOutput
} else if m.RegressionOutput != nil {
in = m.RegressionOutput
} else {
return nil, fmt.Errorf("not valid model prediction output")
}
return in, nil
}

// BinaryClassificationOutput is specification for prediction of binary classification model
type BinaryClassificationOutput struct {
ActualScoreColumn string `json:"actual_score_column"`
NegativeClassLabel string `json:"negative_class_label"`
PredictionScoreColumn string `json:"prediction_score_column"`
PredictionLabelColumn string `json:"prediction_label_column"`
PositiveClassLabel string `json:"positive_class_label"`
ScoreThreshold *float64 `json:"score_threshold,omitempty"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
ActualScoreColumn string `json:"actual_score_column" yaml:"actual_score_column"`
NegativeClassLabel string `json:"negative_class_label" yaml:"negative_class_label"`
PredictionScoreColumn string `json:"prediction_score_column" yaml:"prediction_score_column"`
PredictionLabelColumn string `json:"prediction_label_column" yaml:"prediction_label_column"`
PositiveClassLabel string `json:"positive_class_label" yaml:"positive_class_label"`
ScoreThreshold *float64 `json:"score_threshold,omitempty" yaml:"score_threshold"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}

// RankingOutput is specification for prediction of ranking model
type RankingOutput struct {
RankScoreColumn string `json:"rank_score_column"`
RelevanceScoreColumn string `json:"relevance_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
RankScoreColumn string `json:"rank_score_column" yaml:"rank_score_column"`
RelevanceScoreColumn string `json:"relevance_score_column" yaml:"relevance_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}

// Regression is specification for prediction of regression model
type RegressionOutput struct {
PredictionScoreColumn string `json:"prediction_score_column"`
ActualScoreColumn string `json:"actual_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" validate:"required"`
PredictionScoreColumn string `json:"prediction_score_column" yaml:"prediction_score_column"`
ActualScoreColumn string `json:"actual_score_column" yaml:"actual_score_column"`
OutputClass ModelPredictionOutputClass `json:"output_class" yaml:"output_class" validate:"required"`
}
68 changes: 68 additions & 0 deletions api/models/observability_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package models

import (
"fmt"
)

// PublisherStatus
type PublisherStatus string

const (
Pending PublisherStatus = "pending"
Running PublisherStatus = "running"
Failed PublisherStatus = "failed"
Terminated PublisherStatus = "terminated"
)

// ObservabilityPublisher
type ObservabilityPublisher struct {
ID ID `gorm:"id"`
VersionModelID ID `gorm:"version_model_id"`
VersionID ID `gorm:"version_id"`
Revision int `gorm:"revision"`
Status PublisherStatus `gorm:"status"`
ModelSchemaSpec *SchemaSpec `gorm:"model_schema_spec"`
CreatedUpdated
}

type ActionType string

const (
DeployPublisher ActionType = "deploy"
UndeployPublisher ActionType = "delete"
)

type WorkerData struct {
Namespace string
ModelSchemaSpec *SchemaSpec
Metadata Metadata
ModelName string
ModelVersion string
Revision int
TopicSource string
}

func NewWorkerData(modelVersion *Version, observabilityPublisher *ObservabilityPublisher) *WorkerData {
model := modelVersion.Model
return &WorkerData{
ModelName: model.Name,
Namespace: model.Project.Name,
ModelSchemaSpec: observabilityPublisher.ModelSchemaSpec,
Metadata: Metadata{
App: fmt.Sprintf("%s-observability-publisher", model.Name),
Component: "worker",
Stream: model.Project.Stream,
Team: model.Project.Team,
Labels: model.Project.Labels,
},
ModelVersion: modelVersion.ID.String(),
Revision: observabilityPublisher.Revision,
TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()),
}
}

type ObservabilityPublisherJob struct {
ActionType ActionType
Publisher *ObservabilityPublisher
WorkerData *WorkerData
}
Loading
Loading