diff --git a/.github/workflows/merlin.yml b/.github/workflows/merlin.yml index 0a91eec6d..a45a06cf3 100644 --- a/.github/workflows/merlin.yml +++ b/.github/workflows/merlin.yml @@ -365,7 +365,7 @@ jobs: LOCAL_REGISTRY_PORT: 12345 LOCAL_REGISTRY: "dev.localhost" INGRESS_HOST: "127.0.0.1.nip.io" - MERLIN_CHART_VERSION: 0.11.4 + MERLIN_CHART_VERSION: 0.13.4 E2E_PYTHON_VERSION: "3.10.6" K3S_VERSION: v1.26.7-k3s1 steps: diff --git a/api/api/validator.go b/api/api/validator.go new file mode 100644 index 000000000..39f035b7a --- /dev/null +++ b/api/api/validator.go @@ -0,0 +1,156 @@ +package api + +import ( + "context" + "fmt" + + "github.com/caraml-dev/merlin/config" + "github.com/caraml-dev/merlin/models" + "github.com/caraml-dev/merlin/pkg/protocol" + "github.com/caraml-dev/merlin/service" + "github.com/feast-dev/feast/sdk/go/protos/feast/core" +) + +type requestValidator interface { + validate() error +} + +type funcValidate struct { + f func() error +} + +func newFuncValidate(f func() error) *funcValidate { + return &funcValidate{ + f: f, + } +} + +func (fv *funcValidate) validate() error { + return fv.f() +} + +var supportedUPIModelTypes = map[string]bool{ + models.ModelTypePyFunc: true, + models.ModelTypeCustom: true, + models.ModelTypePyFuncV3: true, +} + +func isModelSupportUPI(model *models.Model) bool { + _, isSupported := supportedUPIModelTypes[model.Type] + + return isSupported +} + +func validateRequest(validators ...requestValidator) error { + for _, validator := range validators { + if err := validator.validate(); err != nil { + return err + } + } + return nil +} + +func customModelValidation(model *models.Model, version *models.Version) requestValidator { + return newFuncValidate(func() error { + if model.Type == models.ModelTypeCustom { + if err := validateCustomPredictor(version); err != nil { + return err + } + } + return nil + }) +} + +func upiModelValidation(model *models.Model, endpointProtocol protocol.Protocol) requestValidator { + return newFuncValidate(func() error { + if !isModelSupportUPI(model) && endpointProtocol == protocol.UpiV1 { + return fmt.Errorf("%s model is not supported by UPI", model.Type) + } + return nil + }) +} + +func newVersionEndpointValidation(version *models.Version, envName string) requestValidator { + return newFuncValidate(func() error { + endpoint, ok := version.GetEndpointByEnvironmentName(envName) + if ok && (endpoint.IsRunning() || endpoint.IsServing()) { + return fmt.Errorf("there is `%s` deployment for the model version", endpoint.Status) + } + return nil + }) +} + +func deploymentQuotaValidation(ctx context.Context, model *models.Model, env *models.Environment, endpointSvc service.EndpointsService) requestValidator { + return newFuncValidate(func() error { + deployedModelVersionCount, err := endpointSvc.CountEndpoints(ctx, env, model) + if err != nil { + return fmt.Errorf("unable to count number of endpoints in env %s: %w", env.Name, err) + } + + if deployedModelVersionCount >= config.MaxDeployedVersion { + return fmt.Errorf("max deployed endpoint reached. Max: %d Current: %d, undeploy existing endpoint before continuing", config.MaxDeployedVersion, deployedModelVersionCount) + } + return nil + }) +} + +func transformerValidation( + ctx context.Context, + endpoint *models.VersionEndpoint, + stdTransformerCfg config.StandardTransformerConfig, + feastCore core.CoreServiceClient) requestValidator { + return newFuncValidate(func() error { + if endpoint.Transformer != nil && endpoint.Transformer.Enabled { + err := validateTransformer(ctx, endpoint, stdTransformerCfg, feastCore) + if err != nil { + return fmt.Errorf("Error validating transformer: %w", err) + } + } + return nil + }) +} + +func updateRequestValidation(prev *models.VersionEndpoint, new *models.VersionEndpoint) requestValidator { + return newFuncValidate(func() error { + if prev.EnvironmentName != new.EnvironmentName { + return fmt.Errorf("updating environment is not allowed, previous: %s, new: %s", prev.EnvironmentName, new.EnvironmentName) + } + + if prev.Status == models.EndpointPending { + return fmt.Errorf("updating endpoint status to %s is not allowed when the endpoint is currently in the pending state", new.Status) + } + + if new.Status != prev.Status { + if prev.Status == models.EndpointServing { + return fmt.Errorf("updating endpoint status to %s is not allowed when the endpoint is currently in the serving state", new.Status) + } + + if new.Status != models.EndpointRunning && new.Status != models.EndpointTerminated { + return fmt.Errorf("updating endpoint status to %s is not allowed", new.Status) + } + } + return nil + }) +} + +func deploymentModeValidation(prev *models.VersionEndpoint, new *models.VersionEndpoint) requestValidator { + return newFuncValidate(func() error { + // Should not allow changing the deployment mode of a pending/running/serving model for 2 reasons: + // * For "serving" models it's risky as, we can't guarantee graceful re-deployment + // * Kserve uses slightly different deployment resource naming under the hood and doesn't clean up the older deployment + if (prev.IsRunning() || prev.IsServing()) && new.DeploymentMode != "" && + new.DeploymentMode != prev.DeploymentMode { + return fmt.Errorf("changing deployment type of a %s model is not allowed, please terminate it first", prev.Status) + } + return nil + }) +} + +func modelObservabilityValidation(endpoint *models.VersionEndpoint, model *models.Model) requestValidator { + return newFuncValidate(func() error { + if endpoint.EnableModelObservability && model.Type != models.ModelTypePyFuncV3 { + return fmt.Errorf("model type should be pyfunc_v3 if want to enable model observablity") + } + return nil + }) +} diff --git a/api/api/version_endpoints_api.go b/api/api/version_endpoints_api.go index eb6e0c4c6..10da316cf 100644 --- a/api/api/version_endpoints_api.go +++ b/api/api/version_endpoints_api.go @@ -22,6 +22,7 @@ import ( merror "github.com/caraml-dev/merlin/pkg/errors" "github.com/caraml-dev/merlin/pkg/protocol" + "github.com/feast-dev/feast/sdk/go/protos/feast/core" "github.com/google/uuid" "google.golang.org/protobuf/encoding/protojson" "gorm.io/gorm" @@ -126,15 +127,6 @@ func (c *EndpointsController) CreateEndpoint(r *http.Request, vars map[string]st } return InternalServerError(fmt.Sprintf("Error getting model / version: %v", err)) } - - // validate custom predictor - if model.Type == models.ModelTypeCustom { - err := c.validateCustomPredictor(ctx, version) - if err != nil { - return BadRequest(fmt.Sprintf("Error validating custom predictor: %v", err)) - } - } - env, err := c.AppContext.EnvironmentService.GetDefaultEnvironment() if err != nil { return InternalServerError(fmt.Sprintf("Unable to find default environment, specify environment target for deployment: %v", err)) @@ -164,38 +156,20 @@ func (c *EndpointsController) CreateEndpoint(r *http.Request, vars map[string]st newEndpoint.EnvironmentName = env.Name } - // check that UPI is supported - if !isModelSupportUPI(model) && newEndpoint.Protocol == protocol.UpiV1 { - return BadRequest( - fmt.Sprintf("%s model is not supported by UPI", model.Type)) + validationRules := []requestValidator{ + customModelValidation(model, version), + upiModelValidation(model, newEndpoint.Protocol), + newVersionEndpointValidation(version, env.Name), + deploymentQuotaValidation(ctx, model, env, c.EndpointsService), + transformerValidation(ctx, newEndpoint, c.StandardTransformerConfig, c.FeastCoreClient), + modelObservabilityValidation(newEndpoint, model), } - // check that the endpoint is not deployed nor deploying - endpoint, ok := version.GetEndpointByEnvironmentName(env.Name) - if ok && (endpoint.IsRunning() || endpoint.IsServing()) { - return BadRequest( - fmt.Sprintf("There is `%s` deployment for the model version", endpoint.Status)) + if err := validateRequest(validationRules...); err != nil { + return BadRequest(fmt.Sprintf("Request validation failed: %v", err)) } - // check that the model version quota - deployedModelVersionCount, err := c.EndpointsService.CountEndpoints(ctx, env, model) - if err != nil { - return InternalServerError(fmt.Sprintf("Unable to count number of endpoints in env %s: %v", env.Name, err)) - } - - if deployedModelVersionCount >= config.MaxDeployedVersion { - return BadRequest(fmt.Sprintf("Max deployed endpoint reached. Max: %d Current: %d, undeploy existing endpoint before continuing", config.MaxDeployedVersion, deployedModelVersionCount)) - } - - // validate transformer - if newEndpoint.Transformer != nil && newEndpoint.Transformer.Enabled { - err := c.validateTransformer(ctx, newEndpoint.Transformer, newEndpoint.Protocol, newEndpoint.Logger) - if err != nil { - return BadRequest(fmt.Sprintf("Error validating transformer: %v", err)) - } - } - - endpoint, err = c.EndpointsService.DeployEndpoint(ctx, env, model, version, newEndpoint) + endpoint, err := c.EndpointsService.DeployEndpoint(ctx, env, model, version, newEndpoint) if err != nil { if errors.Is(err, merror.InvalidInputError) { return BadRequest(fmt.Sprintf("Unable to process model version input: %v", err)) @@ -206,17 +180,6 @@ func (c *EndpointsController) CreateEndpoint(r *http.Request, vars map[string]st return Created(endpoint) } -var supportedUPIModelTypes = map[string]bool{ - models.ModelTypePyFunc: true, - models.ModelTypeCustom: true, -} - -func isModelSupportUPI(model *models.Model) bool { - _, isSupported := supportedUPIModelTypes[model.Type] - - return isSupported -} - // UpdateEndpoint update a an existing endpoint i.e. trigger redeployment func (c *EndpointsController) UpdateEndpoint(r *http.Request, vars map[string]string, body interface{}) *Response { ctx := r.Context() @@ -233,14 +196,6 @@ func (c *EndpointsController) UpdateEndpoint(r *http.Request, vars map[string]st return InternalServerError(fmt.Sprintf("Error getting model / version: %v", err)) } - // validate custom predictor - if model.Type == models.ModelTypeCustom { - err := c.validateCustomPredictor(ctx, version) - if err != nil { - return BadRequest(fmt.Sprintf("Error validating custom predictor: %v", err)) - } - } - endpoint, err := c.EndpointsService.FindByID(ctx, endpointID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { @@ -254,11 +209,6 @@ func (c *EndpointsController) UpdateEndpoint(r *http.Request, vars map[string]st return BadRequest("Unable to parse body as version endpoint resource") } - err = validateUpdateRequest(endpoint, newEndpoint) - if err != nil { - return BadRequest(fmt.Sprintf("Error validating request: %v", err)) - } - env, err := c.AppContext.EnvironmentService.GetEnvironment(newEndpoint.EnvironmentName) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { @@ -267,22 +217,21 @@ func (c *EndpointsController) UpdateEndpoint(r *http.Request, vars map[string]st return InternalServerError(fmt.Sprintf("Error getting the specified environment: %v", err)) } - if newEndpoint.Status == models.EndpointRunning || newEndpoint.Status == models.EndpointServing { - // validate transformer - if newEndpoint.Transformer != nil && newEndpoint.Transformer.Enabled { - err := c.validateTransformer(ctx, newEndpoint.Transformer, newEndpoint.Protocol, newEndpoint.Logger) - if err != nil { - return BadRequest(fmt.Sprintf("Error validating the transformer: %v", err)) - } - } + validationRules := []requestValidator{ + customModelValidation(model, version), + updateRequestValidation(endpoint, newEndpoint), + modelObservabilityValidation(newEndpoint, model), + } - // Should not allow changing the deployment mode of a pending/running/serving model for 2 reasons: - // * For "serving" models it's risky as, we can't guarantee graceful re-deployment - // * Kserve uses slightly different deployment resource naming under the hood and doesn't clean up the older deployment - if (endpoint.IsRunning() || endpoint.IsServing()) && newEndpoint.DeploymentMode != "" && - newEndpoint.DeploymentMode != endpoint.DeploymentMode { - return BadRequest(fmt.Sprintf("Changing deployment type of a %s model is not allowed, please terminate it first.", - endpoint.Status)) + if newEndpoint.Status == models.EndpointRunning || newEndpoint.Status == models.EndpointServing { + validationRules = append( + validationRules, + transformerValidation(ctx, newEndpoint, c.StandardTransformerConfig, c.FeastCoreClient), + deploymentModeValidation(endpoint, newEndpoint), + ) + + if err := validateRequest(validationRules...); err != nil { + return BadRequest(fmt.Sprintf("Request validation failed: %v", err)) } endpoint, err = c.EndpointsService.DeployEndpoint(ctx, env, model, version, newEndpoint) @@ -294,12 +243,16 @@ func (c *EndpointsController) UpdateEndpoint(r *http.Request, vars map[string]st return InternalServerError(fmt.Sprintf("Unable to deploy model version: %v", err)) } } else if newEndpoint.Status == models.EndpointTerminated { + if err := validateRequest(validationRules...); err != nil { + return BadRequest(fmt.Sprintf("Request validation failed: %v", err)) + } + endpoint, err = c.EndpointsService.UndeployEndpoint(ctx, env, model, version, endpoint) if err != nil { return InternalServerError(fmt.Sprintf("Unable to undeploy version endpoint %s: %v", endpointID, err)) } } else { - return InternalServerError(fmt.Sprintf("Updating endpoint status to %s is not allowed", newEndpoint.Status)) + return BadRequest(fmt.Sprintf("Updating endpoint status to %s is not allowed", newEndpoint.Status)) } return Ok(endpoint) @@ -392,39 +345,20 @@ func (c *EndpointsController) ListContainers(r *http.Request, vars map[string]st return Ok(containers) } -func validateUpdateRequest(prev *models.VersionEndpoint, new *models.VersionEndpoint) error { - if prev.EnvironmentName != new.EnvironmentName { - return fmt.Errorf("Updating environment is not allowed, previous: %s, new: %s", prev.EnvironmentName, new.EnvironmentName) - } - - if prev.Status == models.EndpointPending { - return fmt.Errorf("Updating endpoint status to %s is not allowed when the endpoint is currently in the pending state", new.Status) - } - - if new.Status != prev.Status { - if prev.Status == models.EndpointServing { - return fmt.Errorf("Updating endpoint status to %s is not allowed when the endpoint is currently in the serving state", new.Status) - } - - if new.Status != models.EndpointRunning && new.Status != models.EndpointTerminated { - return fmt.Errorf("Updating endpoint status to %s is not allowed", new.Status) - } - } - - return nil -} - -func (c *EndpointsController) validateTransformer(ctx context.Context, trans *models.Transformer, protocol protocol.Protocol, logger *models.Logger) error { +func validateTransformer(ctx context.Context, endpoint *models.VersionEndpoint, stdTransformerConfig config.StandardTransformerConfig, feastCore core.CoreServiceClient) error { + trans := endpoint.Transformer + protocol := endpoint.Protocol + logger := endpoint.Logger switch trans.TransformerType { case models.CustomTransformerType, models.DefaultTransformerType: if trans.Image == "" { - return errors.New("Transformer image name is not specified") + return errors.New("transformer image name is not specified") } case models.StandardTransformerType: envVars := trans.EnvVars.ToMap() cfg, ok := envVars[transformer.StandardTransformerConfigEnvName] if !ok { - return errors.New("Standard transformer config is not specified") + return errors.New("standard transformer config is not specified") } var predictionLogCfg *spec.PredictionLogConfig @@ -432,15 +366,18 @@ func (c *EndpointsController) validateTransformer(ctx context.Context, trans *mo predictionLogCfg = logger.Prediction.ToPredictionLogConfig() } - return c.validateStandardTransformerConfig(ctx, cfg, protocol, predictionLogCfg) + feastOptions := &feast.Options{ + StorageConfigs: stdTransformerConfig.ToFeastStorageConfigs(), + } + return validateStandardTransformerConfig(ctx, cfg, protocol, predictionLogCfg, feastOptions, feastCore) default: - return fmt.Errorf("Unknown transformer type: %s", trans.TransformerType) + return fmt.Errorf("unknown transformer type: %s", trans.TransformerType) } return nil } -func (c *EndpointsController) validateCustomPredictor(ctx context.Context, version *models.Version) error { +func validateCustomPredictor(version *models.Version) error { customPredictor := version.CustomPredictor if customPredictor == nil { return errors.New("custom predictor must be specified") @@ -448,18 +385,14 @@ func (c *EndpointsController) validateCustomPredictor(ctx context.Context, versi return customPredictor.IsValid() } -func (c *EndpointsController) validateStandardTransformerConfig(ctx context.Context, cfg string, protocol protocol.Protocol, predictionLogConfig *spec.PredictionLogConfig) error { +func validateStandardTransformerConfig(ctx context.Context, cfg string, protocol protocol.Protocol, predictionLogConfig *spec.PredictionLogConfig, feastOpts *feast.Options, feastCore core.CoreServiceClient) error { stdTransformerConfig := &spec.StandardTransformerConfig{} err := protojson.Unmarshal([]byte(cfg), stdTransformerConfig) if err != nil { return err } - feastOptions := &feast.Options{ - StorageConfigs: c.StandardTransformerConfig.ToFeastStorageConfigs(), - } - stdTransformerConfig.PredictionLogConfig = predictionLogConfig - return pipeline.ValidateTransformerConfig(ctx, c.FeastCoreClient, stdTransformerConfig, feastOptions, protocol) + return pipeline.ValidateTransformerConfig(ctx, feastCore, stdTransformerConfig, feastOpts, protocol) } diff --git a/api/api/version_endpoints_api_test.go b/api/api/version_endpoints_api_test.go index e7c0e781c..8d2067c01 100644 --- a/api/api/version_endpoints_api_test.go +++ b/api/api/version_endpoints_api_test.go @@ -1042,6 +1042,288 @@ func TestCreateEndpoint(t *testing.T) { }, }, }, + { + desc: "Should success create endpoint with pyfunc_v3 model observability enabled", + vars: map[string]string{ + "model_id": "1", + "version_id": "1", + }, + requestBody: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + ServiceName: "sample", + Namespace: "sample", + EnvironmentName: "dev", + Message: "", + ResourceRequest: &models.ResourceRequest{ + MinReplica: 1, + MaxReplica: 4, + CPURequest: resource.MustParse("1"), + MemoryRequest: resource.MustParse("1Gi"), + }, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + EnableModelObservability: true, + }, + modelService: func() *mocks.ModelsService { + svc := &mocks.ModelsService{} + svc.On("FindByID", mock.Anything, models.ID(1)).Return(&models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc_v3", + MlflowURL: "", + Endpoints: nil, + }, nil) + return svc + }, + versionService: func() *mocks.VersionsService { + svc := &mocks.VersionsService{} + svc.On("FindByID", mock.Anything, models.ID(1), models.ID(1), mock.Anything).Return(&models.Version{ + ID: models.ID(1), + ModelID: models.ID(1), + Model: &models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, + }, nil) + return svc + }, + envService: func() *mocks.EnvironmentService { + svc := &mocks.EnvironmentService{} + svc.On("GetDefaultEnvironment").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + return svc + }, + endpointService: func() *mocks.EndpointsService { + svc := &mocks.EndpointsService{} + svc.On("CountEndpoints", context.Background(), mock.Anything, mock.Anything).Return(0, nil) + svc.On("DeployEndpoint", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, + }, nil) + return svc + }, + monitoringConfig: config.MonitoringConfig{}, + feastCoreMock: func() *feastmocks.CoreServiceClient { + return &feastmocks.CoreServiceClient{} + }, + expected: &Response{ + code: http.StatusCreated, + data: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, + }, + }, + }, + { + desc: "Should failed create endpoint with non pyfunc_v3 and model observability enabled", + vars: map[string]string{ + "model_id": "1", + "version_id": "1", + }, + requestBody: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + ServiceName: "sample", + Namespace: "sample", + EnvironmentName: "dev", + Message: "", + ResourceRequest: &models.ResourceRequest{ + MinReplica: 1, + MaxReplica: 4, + CPURequest: resource.MustParse("1"), + MemoryRequest: resource.MustParse("1Gi"), + }, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + EnableModelObservability: true, + }, + modelService: func() *mocks.ModelsService { + svc := &mocks.ModelsService{} + svc.On("FindByID", mock.Anything, models.ID(1)).Return(&models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, nil) + return svc + }, + versionService: func() *mocks.VersionsService { + svc := &mocks.VersionsService{} + svc.On("FindByID", mock.Anything, models.ID(1), models.ID(1), mock.Anything).Return(&models.Version{ + ID: models.ID(1), + ModelID: models.ID(1), + Model: &models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, + }, nil) + return svc + }, + envService: func() *mocks.EnvironmentService { + svc := &mocks.EnvironmentService{} + svc.On("GetDefaultEnvironment").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + return svc + }, + endpointService: func() *mocks.EndpointsService { + svc := &mocks.EndpointsService{} + svc.On("CountEndpoints", context.Background(), mock.Anything, mock.Anything).Return(0, nil) + svc.On("DeployEndpoint", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, + }, nil) + return svc + }, + monitoringConfig: config.MonitoringConfig{}, + feastCoreMock: func() *feastmocks.CoreServiceClient { + return &feastmocks.CoreServiceClient{} + }, + expected: BadRequest("Request validation failed: model type should be pyfunc_v3 if want to enable model observablity"), + }, { desc: "Should return 400 if UPI is not supported", vars: map[string]string{ @@ -1139,7 +1421,7 @@ func TestCreateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "tensorflow model is not supported by UPI"}, + data: Error{Message: "Request validation failed: tensorflow model is not supported by UPI"}, }, }, { @@ -1522,7 +1804,7 @@ func TestCreateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Max deployed endpoint reached. Max: 2 Current: 5, undeploy existing endpoint before continuing"}, + data: Error{Message: "Request validation failed: max deployed endpoint reached. Max: 2 Current: 5, undeploy existing endpoint before continuing"}, }, }, { @@ -2026,6 +2308,26 @@ func TestCreateEndpoint(t *testing.T) { }, envService: func() *mocks.EnvironmentService { svc := &mocks.EnvironmentService{} + svc.On("GetDefaultEnvironment").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) return svc }, endpointService: func() *mocks.EndpointsService { @@ -2041,7 +2343,7 @@ func TestCreateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating custom predictor: custom predictor image must be set"}, + data: Error{Message: "Request validation failed: custom predictor image must be set"}, }, }, { @@ -2819,7 +3121,7 @@ func TestCreateEndpoint(t *testing.T) { expected: &Response{ code: http.StatusBadRequest, data: Error{ - Message: "Error validating transformer: variable rawFeatures is not registered", + Message: "Request validation failed: Error validating transformer: variable rawFeatures is not registered", }, }, }, @@ -3270,63 +3572,237 @@ func TestCreateEndpoint(t *testing.T) { RedisAddresses: []string{ "10.1.1.2", "10.1.1.3", }, - PoolSize: 5, + PoolSize: 5, + }, + }, + feastCoreMock: func() *feastmocks.CoreServiceClient { + return &feastmocks.CoreServiceClient{} + }, + expected: &Response{ + code: http.StatusBadRequest, + data: Error{Message: "Request validation failed: Error validating transformer: feast source configuration is not valid, servingURL: localhost:6565 source: UNKNOWN"}, + }, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + modelSvc := tC.modelService() + versionSvc := tC.versionService() + envSvc := tC.envService() + endpointSvc := tC.endpointService() + feastCoreMock := tC.feastCoreMock() + + ctl := &EndpointsController{ + AppContext: &AppContext{ + ModelsService: modelSvc, + VersionsService: versionSvc, + EnvironmentService: envSvc, + EndpointsService: endpointSvc, + FeatureToggleConfig: config.FeatureToggleConfig{ + AlertConfig: config.AlertConfig{ + AlertEnabled: true, + }, + MonitoringConfig: tC.monitoringConfig, + }, + StandardTransformerConfig: tC.standardTransformerConfig, + FeastCoreClient: feastCoreMock, + }, + } + resp := ctl.CreateEndpoint(&http.Request{}, tC.vars, tC.requestBody) + assertEqualResponses(t, tC.expected, resp) + }) + } +} + +func TestUpdateEndpoint(t *testing.T) { + uuid := uuid.New() + trueBoolean := true + testCases := []struct { + desc string + vars map[string]string + requestBody *models.VersionEndpoint + modelService func() *mocks.ModelsService + versionService func() *mocks.VersionsService + endpointService func() *mocks.EndpointsService + envService func() *mocks.EnvironmentService + expected *Response + }{ + { + desc: "Should success update endpoint", + vars: map[string]string{ + "model_id": "1", + "version_id": "1", + "endpoint_id": uuid.String(), + }, + requestBody: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + ServiceName: "sample", + Namespace: "sample", + EnvironmentName: "dev", + Message: "", + ResourceRequest: &models.ResourceRequest{ + MinReplica: 1, + MaxReplica: 4, + CPURequest: resource.MustParse("1"), + MemoryRequest: resource.MustParse("1Gi"), + }, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + }, + modelService: func() *mocks.ModelsService { + svc := &mocks.ModelsService{} + svc.On("FindByID", context.Background(), models.ID(1)).Return(&models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, nil) + return svc + }, + versionService: func() *mocks.VersionsService { + svc := &mocks.VersionsService{} + svc.On("FindByID", context.Background(), models.ID(1), models.ID(1), mock.Anything).Return(&models.Version{ + ID: models.ID(1), + ModelID: models.ID(1), + Model: &models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, + }, nil) + return svc + }, + envService: func() *mocks.EnvironmentService { + svc := &mocks.EnvironmentService{} + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + return svc + }, + endpointService: func() *mocks.EndpointsService { + svc := &mocks.EndpointsService{} + svc.On("FindByID", context.Background(), uuid).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + URL: "http://endpoint.svc", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + }, nil) + svc.On("DeployEndpoint", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, + }, nil) + return svc + }, + expected: &Response{ + code: http.StatusOK, + data: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, }, }, - feastCoreMock: func() *feastmocks.CoreServiceClient { - return &feastmocks.CoreServiceClient{} - }, - expected: &Response{ - code: http.StatusBadRequest, - data: Error{Message: "Error validating transformer: feast source configuration is not valid, servingURL: localhost:6565 source: UNKNOWN"}, - }, }, - } - for _, tC := range testCases { - t.Run(tC.desc, func(t *testing.T) { - modelSvc := tC.modelService() - versionSvc := tC.versionService() - envSvc := tC.envService() - endpointSvc := tC.endpointService() - feastCoreMock := tC.feastCoreMock() - - ctl := &EndpointsController{ - AppContext: &AppContext{ - ModelsService: modelSvc, - VersionsService: versionSvc, - EnvironmentService: envSvc, - EndpointsService: endpointSvc, - FeatureToggleConfig: config.FeatureToggleConfig{ - AlertConfig: config.AlertConfig{ - AlertEnabled: true, - }, - MonitoringConfig: tC.monitoringConfig, - }, - StandardTransformerConfig: tC.standardTransformerConfig, - FeastCoreClient: feastCoreMock, - }, - } - resp := ctl.CreateEndpoint(&http.Request{}, tC.vars, tC.requestBody) - assertEqualResponses(t, tC.expected, resp) - }) - } -} - -func TestUpdateEndpoint(t *testing.T) { - uuid := uuid.New() - trueBoolean := true - testCases := []struct { - desc string - vars map[string]string - requestBody *models.VersionEndpoint - modelService func() *mocks.ModelsService - versionService func() *mocks.VersionsService - endpointService func() *mocks.EndpointsService - envService func() *mocks.EnvironmentService - expected *Response - }{ { - desc: "Should success update endpoint", + desc: "Should success update endpoint, pyfunc_v3 and model observablity enabled", vars: map[string]string{ "model_id": "1", "version_id": "1", @@ -3353,6 +3829,7 @@ func TestUpdateEndpoint(t *testing.T) { Value: "1", }, }), + EnableModelObservability: true, }, modelService: func() *mocks.ModelsService { svc := &mocks.ModelsService{} @@ -3362,7 +3839,7 @@ func TestUpdateEndpoint(t *testing.T) { ProjectID: models.ID(1), Project: mlp.Project{}, ExperimentID: 1, - Type: "pyfunc", + Type: "pyfunc_v3", MlflowURL: "", Endpoints: nil, }, nil) @@ -3430,6 +3907,7 @@ func TestUpdateEndpoint(t *testing.T) { Value: "1", }, }), + EnableModelObservability: false, }, nil) svc.On("DeployEndpoint", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&models.VersionEndpoint{ ID: uuid, @@ -3460,7 +3938,8 @@ func TestUpdateEndpoint(t *testing.T) { Value: "1", }, }), - CreatedUpdated: models.CreatedUpdated{}, + CreatedUpdated: models.CreatedUpdated{}, + EnableModelObservability: true, }, nil) return svc }, @@ -3495,7 +3974,8 @@ func TestUpdateEndpoint(t *testing.T) { Value: "1", }, }), - CreatedUpdated: models.CreatedUpdated{}, + CreatedUpdated: models.CreatedUpdated{}, + EnableModelObservability: true, }, }, }, @@ -3660,7 +4140,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating request: Updating endpoint status to running is not allowed when the endpoint is currently in the serving state"}, + data: Error{Message: "Request validation failed: updating endpoint status to running is not allowed when the endpoint is currently in the serving state"}, }, }, { @@ -3773,7 +4253,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating request: Updating endpoint status to running is not allowed when the endpoint is currently in the pending state"}, + data: Error{Message: "Request validation failed: updating endpoint status to running is not allowed when the endpoint is currently in the pending state"}, }, }, { @@ -3849,6 +4329,16 @@ func TestUpdateEndpoint(t *testing.T) { MaxCPU: "1", MaxMemory: "1Gi", }, nil) + svc.On("GetEnvironment", "staging").Return(&models.Environment{ + ID: models.ID(2), + Name: "staging", + Cluster: "staging", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "staging-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) return svc }, endpointService: func() *mocks.EndpointsService { @@ -3886,7 +4376,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating request: Updating environment is not allowed, previous: dev, new: staging"}, + data: Error{Message: "Request validation failed: updating environment is not allowed, previous: dev, new: staging"}, }, }, { @@ -3999,7 +4489,122 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating request: Updating endpoint status to pending is not allowed"}, + data: Error{Message: "Updating endpoint status to pending is not allowed"}, + }, + }, + { + desc: "Should 400 if new endpoint enable model observability but the model is not pyfunc_v3", + vars: map[string]string{ + "model_id": "1", + "version_id": "1", + "endpoint_id": uuid.String(), + }, + requestBody: &models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + ServiceName: "sample", + Namespace: "sample", + EnvironmentName: "dev", + Message: "", + ResourceRequest: &models.ResourceRequest{ + MinReplica: 1, + MaxReplica: 4, + CPURequest: resource.MustParse("1"), + MemoryRequest: resource.MustParse("1Gi"), + }, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + EnableModelObservability: true, + }, + modelService: func() *mocks.ModelsService { + svc := &mocks.ModelsService{} + svc.On("FindByID", context.Background(), models.ID(1)).Return(&models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, nil) + return svc + }, + versionService: func() *mocks.VersionsService { + svc := &mocks.VersionsService{} + svc.On("FindByID", context.Background(), models.ID(1), models.ID(1), mock.Anything).Return(&models.Version{ + ID: models.ID(1), + ModelID: models.ID(1), + Model: &models.Model{ + ID: models.ID(1), + Name: "model-1", + ProjectID: models.ID(1), + Project: mlp.Project{}, + ExperimentID: 1, + Type: "pyfunc", + MlflowURL: "", + Endpoints: nil, + }, + }, nil) + return svc + }, + envService: func() *mocks.EnvironmentService { + svc := &mocks.EnvironmentService{} + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) + return svc + }, + endpointService: func() *mocks.EndpointsService { + svc := &mocks.EndpointsService{} + svc.On("FindByID", context.Background(), uuid).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + URL: "http://endpoint.svc", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + EnableModelObservability: false, + }, nil) + return svc + }, + expected: &Response{ + code: http.StatusBadRequest, + data: Error{Message: "Request validation failed: model type should be pyfunc_v3 if want to enable model observablity"}, }, }, { @@ -4521,15 +5126,85 @@ func TestUpdateEndpoint(t *testing.T) { }, envService: func() *mocks.EnvironmentService { svc := &mocks.EnvironmentService{} + svc.On("GetEnvironment", "dev").Return(&models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, nil) return svc }, endpointService: func() *mocks.EndpointsService { svc := &mocks.EndpointsService{} + svc.On("FindByID", context.Background(), uuid).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + URL: "http://endpoint.svc", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + }, nil) + svc.On("DeployEndpoint", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&models.VersionEndpoint{ + ID: uuid, + VersionID: models.ID(1), + VersionModelID: models.ID(1), + Status: models.EndpointRunning, + URL: "http://endpoint.svc", + ServiceName: "sample", + InferenceServiceName: "sample", + Namespace: "sample", + MonitoringURL: "http://monitoring.com", + Environment: &models.Environment{ + ID: models.ID(1), + Name: "dev", + Cluster: "dev", + IsDefault: &trueBoolean, + Region: "id", + GcpProject: "dev-proj", + MaxCPU: "1", + MaxMemory: "1Gi", + }, + EnvironmentName: "dev", + Message: "", + ResourceRequest: nil, + EnvVars: models.EnvVars([]models.EnvVar{ + { + Name: "WORKER", + Value: "1", + }, + }), + CreatedUpdated: models.CreatedUpdated{}, + }, nil) return svc }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating custom predictor: custom predictor image must be set"}, + data: Error{Message: "Request validation failed: custom predictor image must be set"}, }, }, { @@ -4822,7 +5497,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Changing deployment type of a serving model is not allowed, please terminate it first."}, + data: Error{Message: "Request validation failed: changing deployment type of a serving model is not allowed, please terminate it first"}, }, }, { @@ -4937,7 +5612,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Changing deployment type of a running model is not allowed, please terminate it first."}, + data: Error{Message: "Request validation failed: changing deployment type of a running model is not allowed, please terminate it first"}, }, }, { @@ -5052,7 +5727,7 @@ func TestUpdateEndpoint(t *testing.T) { }, expected: &Response{ code: http.StatusBadRequest, - data: Error{Message: "Error validating request: Updating endpoint status to running is not allowed when the endpoint is currently in the pending state"}, + data: Error{Message: "Request validation failed: updating endpoint status to running is not allowed when the endpoint is currently in the pending state"}, }, }, { diff --git a/api/cluster/controller.go b/api/cluster/controller.go index 960c78fef..570b46f18 100644 --- a/api/cluster/controller.go +++ b/api/cluster/controller.go @@ -92,7 +92,7 @@ type controller struct { ContainerFetcher } -func NewController(clusterConfig Config, deployConfig config.DeploymentConfig, standardTransformerConfig config.StandardTransformerConfig) (Controller, error) { +func NewController(clusterConfig Config, deployConfig config.DeploymentConfig) (Controller, error) { var cfg *rest.Config var err error if clusterConfig.InClusterConfig { @@ -139,7 +139,7 @@ func NewController(clusterConfig Config, deployConfig config.DeploymentConfig, s GcpProject: clusterConfig.GcpProject, }) - kfServingResourceTemplater := resource.NewInferenceServiceTemplater(standardTransformerConfig) + kfServingResourceTemplater := resource.NewInferenceServiceTemplater(deployConfig) return newController( knsClientSet.ServingV1(), kserveClient, @@ -217,7 +217,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) ( } // create new resource - spec, err := c.kfServingResourceTemplater.CreateInferenceServiceSpec(modelService, c.deploymentConfig, deploymentScale) + spec, err := c.kfServingResourceTemplater.CreateInferenceServiceSpec(modelService, deploymentScale) if err != nil { log.Errorf("unable to create inference service spec %s: %v", isvcName, err) return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToCreateInferenceService, isvcName)) diff --git a/api/cluster/controller_test.go b/api/cluster/controller_test.go index fadbb3a34..e3f2f0e33 100644 --- a/api/cluster/controller_test.go +++ b/api/cluster/controller_test.go @@ -311,7 +311,7 @@ func TestController_DeployInferenceService_NamespaceCreation(t *testing.T) { containerFetcher := NewContainerFetcher(v1Client, clusterMetadata) - ctl, _ := newController(knClient, kfClient, v1Client, nil, policyV1Client, istioClient, deployConfig, containerFetcher, nil) + ctl, _ := newController(knClient, kfClient, v1Client, nil, policyV1Client, istioClient, deployConfig, containerFetcher, clusterresource.NewInferenceServiceTemplater(deployConfig)) iSvc, err := ctl.Deploy(context.Background(), modelSvc) if tt.wantError { @@ -671,13 +671,14 @@ func TestController_DeployInferenceService(t *testing.T) { Enabled: true, MaxUnavailablePercentage: &defaultMaxUnavailablePDB, }, + StandardTransformer: config.StandardTransformerConfig{ + ImageName: "ghcr.io/caraml-dev/merlin-transformer-test", + FeastServingKeepAlive: &config.FeastServingKeepAliveConfig{}, + }, } containerFetcher := NewContainerFetcher(v1Client, clusterMetadata) - templater := clusterresource.NewInferenceServiceTemplater(config.StandardTransformerConfig{ - ImageName: "ghcr.io/caraml-dev/merlin-transformer-test", - FeastServingKeepAlive: &config.FeastServingKeepAliveConfig{}, - }) + templater := clusterresource.NewInferenceServiceTemplater(deployConfig) ctl, _ := newController(knClient.ServingV1(), kfClient, v1Client, nil, policyV1Client, istioClient, deployConfig, containerFetcher, templater) iSvc, err := ctl.Deploy(context.Background(), tt.modelService) @@ -804,12 +805,14 @@ func TestGetCurrentDeploymentScale(t *testing.T) { v1Client := fake.NewSimpleClientset().CoreV1() policyV1Client := fake.NewSimpleClientset().PolicyV1().(*fakepolicyv1.FakePolicyV1) - deployConfig := config.DeploymentConfig{} + deployConfig := config.DeploymentConfig{ + StandardTransformer: config.StandardTransformerConfig{ + ImageName: "ghcr.io/caraml-dev/merlin-transformer-test", + FeastServingKeepAlive: &config.FeastServingKeepAliveConfig{}, + }, + } containerFetcher := NewContainerFetcher(v1Client, clusterMetadata) - templater := clusterresource.NewInferenceServiceTemplater(config.StandardTransformerConfig{ - ImageName: "ghcr.io/caraml-dev/merlin-transformer-test", - FeastServingKeepAlive: &config.FeastServingKeepAliveConfig{}, - }) + templater := clusterresource.NewInferenceServiceTemplater(deployConfig) // Create test controller ctl, _ := newController(knClient.ServingV1(), kfClient, v1Client, nil, policyV1Client, nil, deployConfig, containerFetcher, templater) @@ -1144,7 +1147,7 @@ func TestController_Delete(t *testing.T) { containerFetcher := NewContainerFetcher(v1Client, clusterMetadata) - templater := clusterresource.NewInferenceServiceTemplater(config.StandardTransformerConfig{}) + templater := clusterresource.NewInferenceServiceTemplater(tt.deployConfig) ctl, _ := newController(knClient, kfClient, v1Client, nil, policyV1Client, istioClient, tt.deployConfig, containerFetcher, templater) mSvc, err := ctl.Delete(context.Background(), tt.modelService) diff --git a/api/cluster/resource/templater.go b/api/cluster/resource/templater.go index 225830d62..6eb68124c 100644 --- a/api/cluster/resource/templater.go +++ b/api/cluster/resource/templater.go @@ -79,6 +79,14 @@ const ( defaultGRPCPort = 9000 defaultPredictorPort = 80 + envPublisherKafkaTopic = "PUBLISHER_KAFKA_TOPIC" + envPublisherKafkaBrokers = "PUBLISHER_KAFKA_BROKERS" + envPublisherEnabled = "PUBLISHER_ENABLED" + envPublisherKafkaLinger = "PUBLISHER_KAFKA_LINGER_MS" + envPublisherKafkaAck = "PUBLISHER_KAFKA_ACKS" + envPublisherSamplingRatio = "PUBLISHER_SAMPLING_RATIO" + envPublisherKafkaConfig = "PUBLISHER_KAFKA_CONFIG" + grpcHealthProbeCommand = "grpc_health_probe" ) @@ -96,15 +104,15 @@ type DeploymentScale struct { } type InferenceServiceTemplater struct { - standardTransformerConfig config.StandardTransformerConfig + deploymentConfig config.DeploymentConfig } -func NewInferenceServiceTemplater(standardTransformerConfig config.StandardTransformerConfig) *InferenceServiceTemplater { - return &InferenceServiceTemplater{standardTransformerConfig: standardTransformerConfig} +func NewInferenceServiceTemplater(deploymentCfg config.DeploymentConfig) *InferenceServiceTemplater { + return &InferenceServiceTemplater{deploymentConfig: deploymentCfg} } -func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *models.Service, config *config.DeploymentConfig, currentReplicas DeploymentScale) (*kservev1beta1.InferenceService, error) { - applyDefaults(modelService, config) +func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *models.Service, currentReplicas DeploymentScale) (*kservev1beta1.InferenceService, error) { + t.applyDefaults(modelService) // Identify the desired initial scale of the new deployment var initialScale *int @@ -122,7 +130,7 @@ func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *mod } } - annotations, err := createAnnotations(modelService, config, initialScale) + annotations, err := t.createAnnotations(modelService, initialScale) if err != nil { return nil, fmt.Errorf("unable to create inference service spec: %w", err) } @@ -134,10 +142,9 @@ func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *mod Annotations: annotations, } - predictorSpec := createPredictorSpec(modelService, config) - predictorSpec.TopologySpreadConstraints, err = createNewInferenceServiceTopologySpreadConstraints( + predictorSpec := t.createPredictorSpec(modelService) + predictorSpec.TopologySpreadConstraints, err = t.createNewInferenceServiceTopologySpreadConstraints( modelService, - config, kservev1beta1.PredictorComponent, ) if err != nil { @@ -156,9 +163,8 @@ func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *mod if err != nil { return nil, fmt.Errorf("unable to create transformer spec: %w", err) } - inferenceService.Spec.Transformer.TopologySpreadConstraints, err = createNewInferenceServiceTopologySpreadConstraints( + inferenceService.Spec.Transformer.TopologySpreadConstraints, err = t.createNewInferenceServiceTopologySpreadConstraints( modelService, - config, kservev1beta1.TransformerComponent, ) if err != nil { @@ -169,7 +175,7 @@ func (t *InferenceServiceTemplater) CreateInferenceServiceSpec(modelService *mod return inferenceService, nil } -func createPredictorSpec(modelService *models.Service, config *config.DeploymentConfig) kservev1beta1.PredictorSpec { +func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Service) kservev1beta1.PredictorSpec { envVars := modelService.EnvVars // Set cpu limit and memory limit to be 2x of the requests @@ -193,7 +199,7 @@ func createPredictorSpec(modelService *models.Service, config *config.Deployment tolerations := []corev1.Toleration{} if modelService.ResourceRequest.GPUName != "" && !modelService.ResourceRequest.GPURequest.IsZero() { // Look up to the GPU resource type and quantity from DeploymentConfig - for _, gpuConfig := range config.GPUs { + for _, gpuConfig := range t.deploymentConfig.GPUs { if gpuConfig.Name == modelService.ResourceRequest.GPUName { // Declare and initialize resourceType and resourceQuantity variables resourceType := corev1.ResourceName(gpuConfig.ResourceType) @@ -281,10 +287,20 @@ func createPredictorSpec(modelService *models.Service, config *config.Deployment }, }, } - case models.ModelTypePyFunc: + case models.ModelTypePyFunc, models.ModelTypePyFuncV3: envVars := models.MergeEnvVars(modelService.EnvVars, createPyFuncDefaultEnvVars(modelService)) if modelService.Protocol == protocol.UpiV1 { - envVars = append(envVars, models.EnvVar{Name: envGRPCOptions, Value: config.PyfuncGRPCOptions}) + envVars = append(envVars, models.EnvVar{Name: envGRPCOptions, Value: t.deploymentConfig.PyfuncGRPCOptions}) + } + if modelService.EnabledModelObservability && modelService.Type == models.ModelTypePyFuncV3 { + pyfuncPublisherCfg := t.deploymentConfig.PyFuncPublisher + envVars = append(envVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)}) + envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()}) + envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers}) + envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)}) + envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)}) + envVars = append(envVars, models.EnvVar{Name: envPublisherSamplingRatio, Value: fmt.Sprintf("%f", pyfuncPublisherCfg.SamplingRatioRate)}) + envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaConfig, Value: pyfuncPublisherCfg.Kafka.AdditionalConfig}) } predictorSpec = kservev1beta1.PredictorSpec{ PodSpec: kservev1beta1.PodSpec{ @@ -300,6 +316,7 @@ func createPredictorSpec(modelService *models.Service, config *config.Deployment }, }, } + case models.ModelTypeCustom: predictorSpec = createCustomPredictorSpec(modelService, resources, nodeSelector, tolerations) } @@ -336,7 +353,7 @@ func (t *InferenceServiceTemplater) createTransformerSpec(modelService *models.S // Put in defaults if not provided by users (user's input is used) if transformer.TransformerType == models.StandardTransformerType { - transformer.Image = t.standardTransformerConfig.ImageName + transformer.Image = t.deploymentConfig.StandardTransformer.ImageName envVars = t.enrichStandardTransformerEnvVars(modelService, envVars) } @@ -424,43 +441,45 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic } } + standardTransformerCfg := t.deploymentConfig.StandardTransformer + // Additional env var to add addEnvVar := models.EnvVars{} - addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.DefaultFeastSource, Value: t.standardTransformerConfig.DefaultFeastSource.String()}) + addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.DefaultFeastSource, Value: standardTransformerCfg.DefaultFeastSource.String()}) // adding feast related config env variable - feastStorageConfig := t.standardTransformerConfig.ToFeastStorageConfigs() + feastStorageConfig := standardTransformerCfg.ToFeastStorageConfigs() if feastStorageConfigJsonByte, err := json.Marshal(feastStorageConfig); err == nil { addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastStorageConfigs, Value: string(feastStorageConfigJsonByte)}) } // Add keepalive configuration for predictor // only pyfunc config that enforced by merlin - keepAliveModelCfg := t.standardTransformerConfig.ModelClientKeepAlive - if modelService.Protocol == protocol.UpiV1 && modelService.Type == models.ModelTypePyFunc { + keepAliveModelCfg := standardTransformerCfg.ModelClientKeepAlive + if modelService.Protocol == protocol.UpiV1 && (modelService.Type == models.ModelTypePyFunc || modelService.Type == models.ModelTypePyFuncV3) { addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.ModelGRPCKeepAliveEnabled, Value: strconv.FormatBool(keepAliveModelCfg.Enabled)}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.ModelGRPCKeepAliveTime, Value: keepAliveModelCfg.Time.String()}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.ModelGRPCKeepAliveTimeout, Value: keepAliveModelCfg.Timeout.String()}) } - keepaliveCfg := t.standardTransformerConfig.FeastServingKeepAlive + keepaliveCfg := standardTransformerCfg.FeastServingKeepAlive addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastServingKeepAliveEnabled, Value: strconv.FormatBool(keepaliveCfg.Enabled)}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastServingKeepAliveTime, Value: keepaliveCfg.Time.String()}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastServingKeepAliveTimeout, Value: keepaliveCfg.Timeout.String()}) - addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastGRPCConnCount, Value: fmt.Sprintf("%d", t.standardTransformerConfig.FeastGPRCConnCount)}) + addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.FeastGRPCConnCount, Value: fmt.Sprintf("%d", standardTransformerCfg.FeastGPRCConnCount)}) if modelService.Protocol == protocol.UpiV1 { // add kafka configuration - kafkaCfg := t.standardTransformerConfig.Kafka + kafkaCfg := standardTransformerCfg.Kafka addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.KafkaTopic, Value: modelService.GetPredictionLogTopic()}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.KafkaBrokers, Value: kafkaCfg.Brokers}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.KafkaMaxMessageSizeBytes, Value: fmt.Sprintf("%v", kafkaCfg.MaxMessageSizeBytes)}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.KafkaConnectTimeoutMS, Value: fmt.Sprintf("%v", kafkaCfg.ConnectTimeoutMS)}) addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.KafkaSerialization, Value: string(kafkaCfg.SerializationFmt)}) - addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.ModelServerConnCount, Value: fmt.Sprintf("%d", t.standardTransformerConfig.ModelServerConnCount)}) + addEnvVar = append(addEnvVar, models.EnvVar{Name: transformerpkg.ModelServerConnCount, Value: fmt.Sprintf("%d", standardTransformerCfg.ModelServerConnCount)}) } - jaegerCfg := t.standardTransformerConfig.Jaeger + jaegerCfg := standardTransformerCfg.Jaeger jaegerEnvVars := []models.EnvVar{ {Name: transformerpkg.JaegerCollectorURL, Value: jaegerCfg.CollectorURL}, {Name: transformerpkg.JaegerSamplerParam, Value: jaegerCfg.SamplerParam}, @@ -520,14 +539,15 @@ func createPredictorHost(modelService *models.Service) string { return fmt.Sprintf("%s-predictor.%s:%d", modelService.Name, modelService.Namespace, defaultPredictorPort) } -func createAnnotations(modelService *models.Service, config *config.DeploymentConfig, initialScale *int) (map[string]string, error) { +func (t *InferenceServiceTemplater) createAnnotations(modelService *models.Service, initialScale *int) (map[string]string, error) { annotations := make(map[string]string) + config := t.deploymentConfig if config.QueueResourcePercentage != "" { annotations[knserving.QueueSidecarResourcePercentageAnnotationKey] = config.QueueResourcePercentage } - if modelService.Type == models.ModelTypePyFunc { + if modelService.Type == models.ModelTypePyFunc || modelService.Type == models.ModelTypePyFuncV3 { annotations[annotationPrometheusScrapeFlag] = "true" annotations[annotationPrometheusScrapePort] = fmt.Sprint(defaultHTTPPort) } @@ -594,11 +614,11 @@ func createLoggerSpec(loggerURL string, loggerConfig models.LoggerConfig) *kserv // createNewInferenceServiceTopologySpreadConstraints creates topology spread constrains for a component of a new // inference service -func createNewInferenceServiceTopologySpreadConstraints( +func (t *InferenceServiceTemplater) createNewInferenceServiceTopologySpreadConstraints( modelService *models.Service, - config *config.DeploymentConfig, component kservev1beta1.ComponentType, ) ([]corev1.TopologySpreadConstraint, error) { + config := t.deploymentConfig if len(config.TopologySpreadConstraints) == 0 { var topologySpreadConstraints []corev1.TopologySpreadConstraint return topologySpreadConstraints, nil @@ -778,12 +798,17 @@ func createPyFuncDefaultEnvVars(svc *models.Service) models.EnvVars { Name: envProtocol, Value: fmt.Sprint(svc.Protocol), }, + models.EnvVar{ + Name: envProject, + Value: svc.Namespace, + }, } return envVars } -func applyDefaults(service *models.Service, config *config.DeploymentConfig) { +func (t *InferenceServiceTemplater) applyDefaults(service *models.Service) { // apply default resource request for model + config := t.deploymentConfig if service.ResourceRequest == nil { service.ResourceRequest = &models.ResourceRequest{ MinReplica: config.DefaultModelResourceRequests.MinReplica, diff --git a/api/cluster/resource/templater_gpu_test.go b/api/cluster/resource/templater_gpu_test.go index 5af4e17aa..469408e9c 100644 --- a/api/cluster/resource/templater_gpu_test.go +++ b/api/cluster/resource/templater_gpu_test.go @@ -80,6 +80,7 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) { modelSvc := &models.Service{ Name: "my-model-1", ModelName: "my-model", + Namespace: "project", ModelVersion: "1", ArtifactURI: "gs://my-artifacet", Metadata: models.Metadata{ @@ -1572,10 +1573,11 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) { QueueResourcePercentage: tt.resourcePercentage, PyfuncGRPCOptions: "{}", GPUs: defaultGPUsConfig, + StandardTransformer: standardTransformerConfig, } - tpl := NewInferenceServiceTemplater(standardTransformerConfig) - infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, deployConfig, tt.deploymentScale) + tpl := NewInferenceServiceTemplater(*deployConfig) + infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, tt.deploymentScale) if tt.wantErr { assert.Error(t, err) return diff --git a/api/cluster/resource/templater_test.go b/api/cluster/resource/templater_test.go index 96851fdd1..ea8ded8ac 100644 --- a/api/cluster/resource/templater_test.go +++ b/api/cluster/resource/templater_test.go @@ -16,6 +16,7 @@ package resource import ( "fmt" + "strconv" "testing" "time" @@ -152,6 +153,16 @@ var ( SerializationFmt: "protobuf", }, } + + pyfuncPublisherConfig = config.PyFuncPublisherConfig{ + Kafka: config.KafkaConfig{ + Brokers: "kafka-broker:1111", + LingerMS: 1000, + Acks: 0, + AdditionalConfig: "{}", + }, + SamplingRatioRate: 0.01, + } ) func TestCreateInferenceServiceSpec(t *testing.T) { @@ -168,6 +179,7 @@ func TestCreateInferenceServiceSpec(t *testing.T) { modelSvc := &models.Service{ Name: "my-model-1", ModelName: "my-model", + Namespace: project.Name, ModelVersion: "1", ArtifactURI: "gs://my-artifacet", Metadata: models.Metadata{ @@ -182,7 +194,8 @@ func TestCreateInferenceServiceSpec(t *testing.T) { }, }, }, - Protocol: protocol.HttpJson, + Protocol: protocol.HttpJson, + EnabledModelObservability: true, } queueResourcePercentage := "2" @@ -714,6 +727,127 @@ func TestCreateInferenceServiceSpec(t *testing.T) { }, }, }, + { + name: "pyfunc spec with model observability enabled; there is no effect ", + modelSvc: &models.Service{ + Name: modelSvc.Name, + ModelName: modelSvc.ModelName, + ModelVersion: modelSvc.ModelVersion, + Namespace: project.Name, + ArtifactURI: modelSvc.ArtifactURI, + Type: models.ModelTypePyFunc, + Options: &models.ModelOption{ + PyFuncImageName: "gojek/project-model:1", + }, + EnvVars: models.EnvVars{models.EnvVar{Name: envOldDisableLivenessProbe, Value: "true"}}, + Metadata: modelSvc.Metadata, + Protocol: protocol.HttpJson, + }, + resourcePercentage: queueResourcePercentage, + deploymentScale: defaultDeploymentScale, + exp: &kservev1beta1.InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: modelSvc.Name, + Namespace: project.Name, + Annotations: map[string]string{ + knserving.QueueSidecarResourcePercentageAnnotationKey: queueResourcePercentage, + "prometheus.io/scrape": "true", + "prometheus.io/port": "8080", + kserveconstant.DeploymentMode: string(kserveconstant.Serverless), + knautoscaling.InitialScaleAnnotationKey: fmt.Sprint(testPredictorScale), + }, + Labels: map[string]string{ + "gojek.com/app": modelSvc.Metadata.App, + "gojek.com/component": models.ComponentModelVersion, + "gojek.com/environment": testEnvironmentName, + "gojek.com/orchestrator": testOrchestratorName, + "gojek.com/stream": modelSvc.Metadata.Stream, + "gojek.com/team": modelSvc.Metadata.Team, + "sample": "true", + }, + }, + Spec: kservev1beta1.InferenceServiceSpec{ + Predictor: kservev1beta1.PredictorSpec{ + PodSpec: kservev1beta1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envOldDisableLivenessProbe, Value: "true"}}, + createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson)).ToKubernetesEnvVars(), + Resources: expDefaultModelResourceRequests, + }, + }, + }, + ComponentExtensionSpec: kservev1beta1.ComponentExtensionSpec{ + MinReplicas: &defaultModelResourceRequests.MinReplica, + MaxReplicas: defaultModelResourceRequests.MaxReplica, + }, + }, + }, + }, + }, + { + name: "pyfunc_v3 spec with model observability enabled", + modelSvc: &models.Service{ + Name: modelSvc.Name, + ModelName: modelSvc.ModelName, + ModelVersion: modelSvc.ModelVersion, + Namespace: project.Name, + ArtifactURI: modelSvc.ArtifactURI, + Type: models.ModelTypePyFuncV3, + Options: &models.ModelOption{ + PyFuncImageName: "gojek/project-model:1", + }, + Metadata: modelSvc.Metadata, + Protocol: protocol.HttpJson, + EnabledModelObservability: true, + }, + resourcePercentage: queueResourcePercentage, + deploymentScale: defaultDeploymentScale, + exp: &kservev1beta1.InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: modelSvc.Name, + Namespace: project.Name, + Annotations: map[string]string{ + knserving.QueueSidecarResourcePercentageAnnotationKey: queueResourcePercentage, + "prometheus.io/scrape": "true", + "prometheus.io/port": "8080", + kserveconstant.DeploymentMode: string(kserveconstant.Serverless), + knautoscaling.InitialScaleAnnotationKey: fmt.Sprint(testPredictorScale), + }, + Labels: map[string]string{ + "gojek.com/app": modelSvc.Metadata.App, + "gojek.com/component": models.ComponentModelVersion, + "gojek.com/environment": testEnvironmentName, + "gojek.com/orchestrator": testOrchestratorName, + "gojek.com/stream": modelSvc.Metadata.Stream, + "gojek.com/team": modelSvc.Metadata.Team, + "sample": "true", + }, + }, + Spec: kservev1beta1.InferenceServiceSpec{ + Predictor: kservev1beta1.PredictorSpec{ + PodSpec: kservev1beta1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson), + createPyFuncPublisherEnvVars(modelSvc, pyfuncPublisherConfig)).ToKubernetesEnvVars(), + Resources: expDefaultModelResourceRequests, + LivenessProbe: probeConfig, + }, + }, + }, + ComponentExtensionSpec: kservev1beta1.ComponentExtensionSpec{ + MinReplicas: &defaultModelResourceRequests.MinReplica, + MaxReplicas: defaultModelResourceRequests.MaxReplica, + }, + }, + }, + }, + }, { name: "pyfunc with liveness probe disabled", modelSvc: &models.Service{ @@ -1608,10 +1742,12 @@ func TestCreateInferenceServiceSpec(t *testing.T) { DefaultTransformerResourceRequests: defaultTransformerResourceRequests, QueueResourcePercentage: tt.resourcePercentage, PyfuncGRPCOptions: "{}", + StandardTransformer: standardTransformerConfig, + PyFuncPublisher: pyfuncPublisherConfig, } - tpl := NewInferenceServiceTemplater(standardTransformerConfig) - infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, deployConfig, tt.deploymentScale) + tpl := NewInferenceServiceTemplater(*deployConfig) + infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, tt.deploymentScale) if tt.wantErr { assert.Error(t, err) return @@ -2300,10 +2436,11 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) { DefaultTransformerResourceRequests: defaultTransformerResourceRequests, QueueResourcePercentage: queueResourcePercentage, PyfuncGRPCOptions: "{}", + StandardTransformer: standardTransformerConfig, } - tpl := NewInferenceServiceTemplater(standardTransformerConfig) - infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, deployConfig, tt.deploymentScale) + tpl := NewInferenceServiceTemplater(*deployConfig) + infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, tt.deploymentScale) if tt.wantErr { assert.Error(t, err) return @@ -2790,10 +2927,11 @@ func TestCreateInferenceServiceSpecWithLogger(t *testing.T) { DefaultModelResourceRequests: defaultModelResourceRequests, DefaultTransformerResourceRequests: defaultTransformerResourceRequests, QueueResourcePercentage: queueResourcePercentage, + StandardTransformer: standardTransformerConfig, } - tpl := NewInferenceServiceTemplater(standardTransformerConfig) - infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, deployConfig, tt.deploymentScale) + tpl := NewInferenceServiceTemplater(*deployConfig) + infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, tt.deploymentScale) if tt.wantErr { assert.Error(t, err) return @@ -3735,10 +3873,11 @@ func TestCreateInferenceServiceSpecWithTopologySpreadConstraints(t *testing.T) { }, }, }, + StandardTransformer: standardTransformerConfig, } - tpl := NewInferenceServiceTemplater(standardTransformerConfig) - infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, deployConfig, tt.deploymentScale) + tpl := NewInferenceServiceTemplater(*deployConfig) + infSvcSpec, err := tpl.CreateInferenceServiceSpec(tt.modelSvc, tt.deploymentScale) if tt.wantErr { assert.Error(t, err) return @@ -3816,7 +3955,9 @@ func TestCreateTransformerSpec(t *testing.T) { {Name: transformerpkg.JaegerCollectorURL, Value: "NEW_HOST"}, // test user overwrite }, }, - &config.DeploymentConfig{}, + &config.DeploymentConfig{ + StandardTransformer: standardTransformerConfig, + }, }, &kservev1beta1.TransformerSpec{ PodSpec: kservev1beta1.PodSpec{ @@ -3916,7 +4057,7 @@ func TestCreateTransformerSpec(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tpl := NewInferenceServiceTemplater(standardTransformerConfig) + tpl := NewInferenceServiceTemplater(*tt.args.config) got := tpl.createTransformerSpec(tt.args.modelService, tt.args.transformer) assert.Equal(t, tt.want, got) }) @@ -3959,6 +4100,44 @@ func createPyFuncDefaultEnvVarsWithProtocol(svc *models.Service, protocolValue p Name: envProtocol, Value: fmt.Sprint(protocolValue), }, + models.EnvVar{ + Name: envProject, + Value: svc.Namespace, + }, + } + return envVars +} + +func createPyFuncPublisherEnvVars(svc *models.Service, pyfuncPublisher config.PyFuncPublisherConfig) models.EnvVars { + envVars := models.EnvVars{ + models.EnvVar{ + Name: envPublisherEnabled, + Value: strconv.FormatBool(svc.EnabledModelObservability), + }, + models.EnvVar{ + Name: envPublisherKafkaTopic, + Value: svc.GetPredictionLogTopicForVersion(), + }, + models.EnvVar{ + Name: envPublisherKafkaBrokers, + Value: pyfuncPublisher.Kafka.Brokers, + }, + models.EnvVar{ + Name: envPublisherKafkaLinger, + Value: fmt.Sprintf("%d", pyfuncPublisher.Kafka.LingerMS), + }, + models.EnvVar{ + Name: envPublisherKafkaAck, + Value: fmt.Sprintf("%d", pyfuncPublisher.Kafka.Acks), + }, + models.EnvVar{ + Name: envPublisherSamplingRatio, + Value: fmt.Sprintf("%f", pyfuncPublisher.SamplingRatioRate), + }, + models.EnvVar{ + Name: envPublisherKafkaConfig, + Value: pyfuncPublisher.Kafka.AdditionalConfig, + }, } return envVars } diff --git a/api/cmd/api/setup.go b/api/cmd/api/setup.go index fefe1ee2c..d95deac59 100644 --- a/api/cmd/api/setup.go +++ b/api/cmd/api/setup.go @@ -143,7 +143,7 @@ func initImageBuilder(cfg *config.Config) (webserviceBuilder imagebuilder.ImageB ctl, err := cluster.NewController( clusterCfg, config.DeploymentConfig{}, // We don't need deployment config here because we're going to retrieve the log not deploy model. - config.StandardTransformerConfig{}) + ) imageBuilderJanitor = imagebuilder.NewJanitor(ctl, imagebuilder.JanitorConfig{ BuildNamespace: cfg.ImageBuilderConfig.BuildNamespace, Retention: cfg.ImageBuilderConfig.Retention, @@ -184,7 +184,7 @@ func initEnvironmentService(cfg *config.Config, db *gorm.DB) service.Environment isDefaultPredictionJob = &envCfg.IsDefaultPredictionJob } - deploymentCfg := config.ParseDeploymentConfig(envCfg, cfg.PyfuncGRPCOptions) + deploymentCfg := config.ParseDeploymentConfig(envCfg, cfg) env, err := envSvc.GetEnvironment(envCfg.Name) if err != nil { @@ -423,8 +423,8 @@ func initClusterControllers(cfg *config.Config) map[string]cluster.Controller { ctl, err := cluster.NewController( clusterCfg, - config.ParseDeploymentConfig(env, cfg.PyfuncGRPCOptions), - cfg.StandardTransformerConfig) + config.ParseDeploymentConfig(env, cfg), + ) if err != nil { log.Panicf("unable to initialize cluster controller %v", err) } @@ -463,7 +463,7 @@ func initLogService(cfg *config.Config) service.LogService { ctl, err := cluster.NewController( clusterCfg, config.DeploymentConfig{}, // We don't need deployment config here because we're going to retrieve the log not deploy model. - cfg.StandardTransformerConfig) + ) if err != nil { log.Panicf("unable to initialize cluster controller %v", err) } @@ -485,8 +485,8 @@ func initLogService(cfg *config.Config) service.LogService { ctl, err := cluster.NewController( clusterCfg, - config.ParseDeploymentConfig(env, cfg.PyfuncGRPCOptions), - cfg.StandardTransformerConfig) + config.ParseDeploymentConfig(env, cfg), + ) if err != nil { log.Panicf("unable to initialize cluster controller %v", err) } diff --git a/api/config/config.go b/api/config/config.go index 0f3d441b6..68f10d78f 100644 --- a/api/config/config.go +++ b/api/config/config.go @@ -64,6 +64,7 @@ type Config struct { UI UIConfig StandardTransformerConfig StandardTransformerConfig MlflowConfig MlflowConfig + PyFuncPublisherConfig PyFuncPublisherConfig } // UIConfig stores the configuration for the UI. @@ -332,10 +333,19 @@ type StandardTransformerConfig struct { type KafkaConfig struct { Topic string Brokers string `validate:"required"` + Acks int `default:"0"` CompressionType string `validate:"required" default:"none"` MaxMessageSizeBytes int `validate:"required" default:"1048588"` ConnectTimeoutMS int `validate:"required" default:"1000"` SerializationFmt string `validate:"required" default:"protobuf"` + LingerMS int `validate:"required" default:"100"` + + AdditionalConfig string `validate:"required" default:"{}"` +} + +type PyFuncPublisherConfig struct { + Kafka KafkaConfig + SamplingRatioRate float64 `validate:"required" default:"0.01"` } // SimulationFeastConfig feast config that aimed to be used only for simulation of standard transformer diff --git a/api/config/deployment.go b/api/config/deployment.go index e67aa614a..a1ad166fb 100644 --- a/api/config/deployment.go +++ b/api/config/deployment.go @@ -45,6 +45,10 @@ type DeploymentConfig struct { PodDisruptionBudget PodDisruptionBudgetConfig // GPU Config GPUs []GPUConfig + // PyFunc publisher Config + PyFuncPublisher PyFuncPublisherConfig + // Standard Transformer Config + StandardTransformer StandardTransformerConfig } type ResourceRequests struct { diff --git a/api/config/environment.go b/api/config/environment.go index b19cc3e30..d65f0005c 100644 --- a/api/config/environment.go +++ b/api/config/environment.go @@ -159,28 +159,30 @@ func InitEnvironmentConfigs(path string) ([]*EnvironmentConfig, error) { return configs, nil } -func ParseDeploymentConfig(cfg *EnvironmentConfig, pyfuncGRPCOptions string) DeploymentConfig { +func ParseDeploymentConfig(envCfg *EnvironmentConfig, cfg *Config) DeploymentConfig { return DeploymentConfig{ - DeploymentTimeout: cfg.DeploymentTimeout, - NamespaceTimeout: cfg.NamespaceTimeout, + DeploymentTimeout: envCfg.DeploymentTimeout, + NamespaceTimeout: envCfg.NamespaceTimeout, DefaultModelResourceRequests: &ResourceRequests{ - MinReplica: cfg.DefaultDeploymentConfig.MinReplica, - MaxReplica: cfg.DefaultDeploymentConfig.MaxReplica, - CPURequest: resource.MustParse(cfg.DefaultDeploymentConfig.CPURequest), - MemoryRequest: resource.MustParse(cfg.DefaultDeploymentConfig.MemoryRequest), + MinReplica: envCfg.DefaultDeploymentConfig.MinReplica, + MaxReplica: envCfg.DefaultDeploymentConfig.MaxReplica, + CPURequest: resource.MustParse(envCfg.DefaultDeploymentConfig.CPURequest), + MemoryRequest: resource.MustParse(envCfg.DefaultDeploymentConfig.MemoryRequest), }, DefaultTransformerResourceRequests: &ResourceRequests{ - MinReplica: cfg.DefaultTransformerConfig.MinReplica, - MaxReplica: cfg.DefaultTransformerConfig.MaxReplica, - CPURequest: resource.MustParse(cfg.DefaultTransformerConfig.CPURequest), - MemoryRequest: resource.MustParse(cfg.DefaultTransformerConfig.MemoryRequest), + MinReplica: envCfg.DefaultTransformerConfig.MinReplica, + MaxReplica: envCfg.DefaultTransformerConfig.MaxReplica, + CPURequest: resource.MustParse(envCfg.DefaultTransformerConfig.CPURequest), + MemoryRequest: resource.MustParse(envCfg.DefaultTransformerConfig.MemoryRequest), }, - MaxCPU: resource.MustParse(cfg.MaxCPU), - MaxMemory: resource.MustParse(cfg.MaxMemory), - TopologySpreadConstraints: cfg.TopologySpreadConstraints, - QueueResourcePercentage: cfg.QueueResourcePercentage, - PyfuncGRPCOptions: pyfuncGRPCOptions, - PodDisruptionBudget: cfg.PodDisruptionBudget, - GPUs: cfg.GPUs, + MaxCPU: resource.MustParse(envCfg.MaxCPU), + MaxMemory: resource.MustParse(envCfg.MaxMemory), + TopologySpreadConstraints: envCfg.TopologySpreadConstraints, + QueueResourcePercentage: envCfg.QueueResourcePercentage, + PyfuncGRPCOptions: cfg.PyfuncGRPCOptions, + PodDisruptionBudget: envCfg.PodDisruptionBudget, + GPUs: envCfg.GPUs, + StandardTransformer: cfg.StandardTransformerConfig, + PyFuncPublisher: cfg.PyFuncPublisherConfig, } } diff --git a/api/config/environment_test.go b/api/config/environment_test.go index 24db43b33..e48dba1ed 100644 --- a/api/config/environment_test.go +++ b/api/config/environment_test.go @@ -146,7 +146,7 @@ func TestPDBConfig(t *testing.T) { } for _, envCfg := range cfg.ClusterConfig.EnvironmentConfigs { - deploymentCfg := ParseDeploymentConfig(envCfg, "") + deploymentCfg := ParseDeploymentConfig(envCfg, cfg) assert.Equal(t, tC.expectedPDBConfig, deploymentCfg.PodDisruptionBudget) } }) @@ -253,7 +253,7 @@ func TestGPUsConfig(t *testing.T) { } for _, envCfg := range cfg.ClusterConfig.EnvironmentConfigs { - deploymentCfg := ParseDeploymentConfig(envCfg, "") + deploymentCfg := ParseDeploymentConfig(envCfg, cfg) assert.Equal(t, tC.expectedGPUsConfig, deploymentCfg.GPUs) } }) diff --git a/api/models/model.go b/api/models/model.go index 3cf8b8da6..e0877a0fb 100644 --- a/api/models/model.go +++ b/api/models/model.go @@ -30,6 +30,7 @@ const ( ModelTypeOnnx = "onnx" ModelTypePyFuncV2 = "pyfunc_v2" ModelTypeCustom = "custom" + ModelTypePyFuncV3 = "pyfunc_v3" ) type ID int @@ -64,3 +65,7 @@ type Model struct { CreatedUpdated } + +func (m *Model) IsPyFuncModelService() bool { + return m.Type == ModelTypePyFunc || m.Type == ModelTypePyFuncV3 +} diff --git a/api/models/service.go b/api/models/service.go index a7a150c4e..4171470ee 100644 --- a/api/models/service.go +++ b/api/models/service.go @@ -51,7 +51,8 @@ type Service struct { AutoscalingPolicy *autoscaling.AutoscalingPolicy Protocol protocol.Protocol // CurrentIsvcName is the name of the current running/serving InferenceService's revision - CurrentIsvcName string + CurrentIsvcName string + EnabledModelObservability bool } func NewService(model *Model, version *Version, modelOpt *ModelOption, endpoint *VersionEndpoint) *Service { @@ -73,12 +74,13 @@ func NewService(model *Model, version *Version, modelOpt *ModelOption, endpoint Team: model.Project.Team, Labels: MergeProjectVersionLabels(model.Project.Labels, version.Labels), }, - Transformer: endpoint.Transformer, - Logger: endpoint.Logger, - DeploymentMode: endpoint.DeploymentMode, - AutoscalingPolicy: endpoint.AutoscalingPolicy, - Protocol: endpoint.Protocol, - CurrentIsvcName: endpoint.InferenceServiceName, + Transformer: endpoint.Transformer, + Logger: endpoint.Logger, + DeploymentMode: endpoint.DeploymentMode, + AutoscalingPolicy: endpoint.AutoscalingPolicy, + Protocol: endpoint.Protocol, + CurrentIsvcName: endpoint.InferenceServiceName, + EnabledModelObservability: endpoint.EnableModelObservability, } } @@ -86,6 +88,10 @@ func (svc *Service) GetPredictionLogTopic() string { return fmt.Sprintf("caraml-%s-%s-prediction-log", svc.Namespace, svc.ModelName) } +func (svc *Service) GetPredictionLogTopicForVersion() string { + return fmt.Sprintf("caraml-%s-%s-%s-prediction-log", svc.Namespace, svc.ModelName, svc.ModelVersion) +} + func MergeProjectVersionLabels(projectLabels mlp.Labels, versionLabels KV) mlp.Labels { projectLabelsMap := map[string]int{} for index, projectLabel := range projectLabels { diff --git a/api/models/version_endpoint.go b/api/models/version_endpoint.go index ca959910d..db232426f 100644 --- a/api/models/version_endpoint.go +++ b/api/models/version_endpoint.go @@ -70,7 +70,8 @@ type VersionEndpoint struct { // AutoscalingPolicy controls the conditions when autoscaling should be triggered AutoscalingPolicy *autoscaling.AutoscalingPolicy `json:"autoscaling_policy" gorm:"autoscaling_policy"` // Protocol to be used when deploying the model - Protocol protocol.Protocol `json:"protocol" gorm:"protocol"` + Protocol protocol.Protocol `json:"protocol" gorm:"protocol"` + EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"` CreatedUpdated } diff --git a/api/queue/work/model_service_deployment.go b/api/queue/work/model_service_deployment.go index 4f8c28620..e106ac42f 100644 --- a/api/queue/work/model_service_deployment.go +++ b/api/queue/work/model_service_deployment.go @@ -184,7 +184,7 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error { func (depl *ModelServiceDeployment) generateModelOptions(ctx context.Context, model *models.Model, version *models.Version) (*models.ModelOption, error) { modelOpt := &models.ModelOption{} switch model.Type { - case models.ModelTypePyFunc: + case models.ModelTypePyFunc, models.ModelTypePyFuncV3: imageRef, err := depl.ImageBuilder.BuildImage(ctx, model.Project, model, version) if err != nil { return modelOpt, err diff --git a/api/queue/work/model_service_deployment_test.go b/api/queue/work/model_service_deployment_test.go index ea615de90..2ceca2b1d 100644 --- a/api/queue/work/model_service_deployment_test.go +++ b/api/queue/work/model_service_deployment_test.go @@ -208,6 +208,53 @@ func TestExecuteDeployment(t *testing.T) { return mockImgBuilder }, }, + { + name: "Success: empty pyfunc_v3 model", + model: &models.Model{Name: "model", Project: project, Type: models.ModelTypePyFuncV3}, + version: &models.Version{ID: 1}, + endpoint: &models.VersionEndpoint{ + EnvironmentName: env.Name, + ResourceRequest: env.DefaultResourceRequest, + VersionID: version.ID, + Namespace: project.Name, + }, + deploymentStorage: func() *mocks.DeploymentStorage { + mockStorage := &mocks.DeploymentStorage{} + mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil) + mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil) + return mockStorage + }, + storage: func() *mocks.VersionEndpointStorage { + mockStorage := &mocks.VersionEndpointStorage{} + mockStorage.On("Save", mock.Anything).Return(nil) + mockStorage.On("Get", mock.Anything).Return(&models.VersionEndpoint{ + Environment: env, + EnvironmentName: env.Name, + ResourceRequest: env.DefaultResourceRequest, + VersionID: version.ID, + Namespace: project.Name, + }, nil) + return mockStorage + }, + controller: func() *clusterMock.Controller { + ctrl := &clusterMock.Controller{} + ctrl.On("Deploy", context.Background(), mock.Anything, mock.Anything). + Return(&models.Service{ + Name: iSvcName, + Namespace: project.Name, + ServiceName: svcName, + URL: url, + Metadata: svcMetadata, + }, nil) + return ctrl + }, + imageBuilder: func() *imageBuilderMock.ImageBuilder { + mockImgBuilder := &imageBuilderMock.ImageBuilder{} + mockImgBuilder.On("BuildImage", context.Background(), project, mock.Anything, mock.Anything). + Return("gojek/mymodel-1:latest", nil) + return mockImgBuilder + }, + }, { name: "Success: pytorch model with transformer", model: &models.Model{Name: "model", Project: project, Type: models.ModelTypePyTorch}, diff --git a/api/service/version_endpoint_service.go b/api/service/version_endpoint_service.go index 714d4e501..570a15b60 100644 --- a/api/service/version_endpoint_service.go +++ b/api/service/version_endpoint_service.go @@ -237,6 +237,8 @@ func (k *endpointService) override(left *models.VersionEndpoint, right *models.V left.Protocol = protocol.HttpJson } + left.EnableModelObservability = right.EnableModelObservability + return nil } @@ -292,7 +294,7 @@ func (k *endpointService) ListContainers(ctx context.Context, model *models.Mode } containers := make([]*models.Container, 0) - if model.Type == models.ModelTypePyFunc { + if model.IsPyFuncModelService() { imgBuilderContainers, err := k.imageBuilder.GetContainers(ctx, model.Project, model, version) if err != nil { return nil, err diff --git a/api/service/version_endpoint_service_test.go b/api/service/version_endpoint_service_test.go index 2ee382640..4c73531ec 100644 --- a/api/service/version_endpoint_service_test.go +++ b/api/service/version_endpoint_service_test.go @@ -198,6 +198,27 @@ func TestDeployEndpoint(t *testing.T) { }, wantDeployError: false, }, + { + name: "success: empty pyfunc_v3 model", + args: args{ + env, + &models.Model{Name: "model", Project: project, Type: models.ModelTypePyFuncV3}, + &models.Version{ID: 1}, + &models.VersionEndpoint{ + ResourceRequest: env.DefaultResourceRequest, + }, + }, + expectedEndpoint: &models.VersionEndpoint{ + DeploymentMode: deployment.ServerlessDeploymentMode, + AutoscalingPolicy: autoscaling.DefaultServerlessAutoscalingPolicy, + ResourceRequest: env.DefaultResourceRequest, + Namespace: project.Name, + URL: "", + Status: models.EndpointPending, + Protocol: protocol.HttpJson, + }, + wantDeployError: false, + }, { name: "success: empty custom model", args: args{ @@ -765,6 +786,28 @@ func TestDeployEndpoint(t *testing.T) { }, wantDeployError: false, }, + { + name: "success: pyfunc_v3 upi v1 model", + args: args{ + env, + &models.Model{Name: "model", Project: project, Type: models.ModelTypePyFuncV3}, + &models.Version{ID: 1}, + &models.VersionEndpoint{ + ResourceRequest: env.DefaultResourceRequest, + Protocol: protocol.UpiV1, + }, + }, + expectedEndpoint: &models.VersionEndpoint{ + DeploymentMode: deployment.ServerlessDeploymentMode, + AutoscalingPolicy: autoscaling.DefaultServerlessAutoscalingPolicy, + ResourceRequest: env.DefaultResourceRequest, + Namespace: project.Name, + URL: "", + Status: models.EndpointPending, + Protocol: protocol.UpiV1, + }, + wantDeployError: false, + }, } for _, tt := range tests { diff --git a/config.yaml b/config.yaml index 4bfeeb20f..0326e0523 100644 --- a/config.yaml +++ b/config.yaml @@ -75,3 +75,9 @@ StandardTransformerConfig: MlflowConfig: TrackingURL: https://caraml.dev/mlflow ArtifactServiceType: nop + +PyFuncPublisherConfig: + Kafka: + Brokers: localhost:9092 + LingerMS: 1000 + Acks: 0 diff --git a/db-migrations/34_version_endpoints_enable_observability.down.sql b/db-migrations/34_version_endpoints_enable_observability.down.sql new file mode 100644 index 000000000..6e84ac269 --- /dev/null +++ b/db-migrations/34_version_endpoints_enable_observability.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE version_endpoints +DROP COLUMN enable_model_observability; diff --git a/db-migrations/34_version_endpoints_enable_observability.up.sql b/db-migrations/34_version_endpoints_enable_observability.up.sql new file mode 100644 index 000000000..9b06e589c --- /dev/null +++ b/db-migrations/34_version_endpoints_enable_observability.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE version_endpoints +ADD COLUMN enable_model_observability BOOLEAN NOT NULL DEFAULT false; diff --git a/python/sdk/client/models/model.py b/python/sdk/client/models/model.py index 306af6992..1ab589462 100644 --- a/python/sdk/client/models/model.py +++ b/python/sdk/client/models/model.py @@ -10,16 +10,19 @@ Generated by: https://github.com/swagger-api/swagger-codegen.git """ + import pprint import re # noqa: F401 import six + class Model(object): """NOTE: This class is auto generated by the swagger code generator program. Do not edit the class manually. """ + """ Attributes: swagger_types (dict): The key is attribute name @@ -53,6 +56,7 @@ class Model(object): def __init__(self, id=None, project_id=None, mlflow_experiment_id=None, name=None, type=None, mlflow_url=None, endpoints=None, created_at=None, updated_at=None): # noqa: E501 """Model - a model defined in Swagger""" # noqa: E501 + self._id = None self._project_id = None self._mlflow_experiment_id = None @@ -63,6 +67,7 @@ def __init__(self, id=None, project_id=None, mlflow_experiment_id=None, name=Non self._created_at = None self._updated_at = None self.discriminator = None + if id is not None: self.id = id if project_id is not None: @@ -186,7 +191,7 @@ def type(self, type): :param type: The type of this Model. # noqa: E501 :type: str """ - allowed_values = ["xgboost", "tensorflow", "sklearn", "pytorch", "pyfunc", "pyfunc_v2", "custom"] # noqa: E501 + allowed_values = ["xgboost", "tensorflow", "sklearn", "pytorch", "pyfunc", "pyfunc_v2", "pyfunc_v3", "custom"] # noqa: E501 if type not in allowed_values: raise ValueError( "Invalid value for `type` ({0}), must be one of {1}" # noqa: E501 diff --git a/python/sdk/client/models/version_endpoint.py b/python/sdk/client/models/version_endpoint.py index 23c1f6e44..6f2b1dba5 100644 --- a/python/sdk/client/models/version_endpoint.py +++ b/python/sdk/client/models/version_endpoint.py @@ -10,16 +10,19 @@ Generated by: https://github.com/swagger-api/swagger-codegen.git """ + import pprint import re # noqa: F401 import six + class VersionEndpoint(object): """NOTE: This class is auto generated by the swagger code generator program. Do not edit the class manually. """ + """ Attributes: swagger_types (dict): The key is attribute name @@ -44,6 +47,7 @@ class VersionEndpoint(object): 'deployment_mode': 'DeploymentMode', 'autoscaling_policy': 'AutoscalingPolicy', 'protocol': 'Protocol', + 'enable_model_observability': 'bool', 'created_at': 'datetime', 'updated_at': 'datetime' } @@ -65,12 +69,14 @@ class VersionEndpoint(object): 'deployment_mode': 'deployment_mode', 'autoscaling_policy': 'autoscaling_policy', 'protocol': 'protocol', + 'enable_model_observability': 'enable_model_observability', 'created_at': 'created_at', 'updated_at': 'updated_at' } - def __init__(self, id=None, version_id=None, status=None, url=None, service_name=None, environment_name=None, environment=None, monitoring_url=None, message=None, resource_request=None, env_vars=None, transformer=None, logger=None, deployment_mode=None, autoscaling_policy=None, protocol=None, created_at=None, updated_at=None): # noqa: E501 + def __init__(self, id=None, version_id=None, status=None, url=None, service_name=None, environment_name=None, environment=None, monitoring_url=None, message=None, resource_request=None, env_vars=None, transformer=None, logger=None, deployment_mode=None, autoscaling_policy=None, protocol=None, enable_model_observability=None, created_at=None, updated_at=None): # noqa: E501 """VersionEndpoint - a model defined in Swagger""" # noqa: E501 + self._id = None self._version_id = None self._status = None @@ -87,9 +93,11 @@ def __init__(self, id=None, version_id=None, status=None, url=None, service_name self._deployment_mode = None self._autoscaling_policy = None self._protocol = None + self._enable_model_observability = None self._created_at = None self._updated_at = None self.discriminator = None + if id is not None: self.id = id if version_id is not None: @@ -122,6 +130,8 @@ def __init__(self, id=None, version_id=None, status=None, url=None, service_name self.autoscaling_policy = autoscaling_policy if protocol is not None: self.protocol = protocol + if enable_model_observability is not None: + self.enable_model_observability = enable_model_observability if created_at is not None: self.created_at = created_at if updated_at is not None: @@ -463,6 +473,27 @@ def protocol(self, protocol): self._protocol = protocol + @property + def enable_model_observability(self): + """Gets the enable_model_observability of this VersionEndpoint. # noqa: E501 + + + :return: The enable_model_observability of this VersionEndpoint. # noqa: E501 + :rtype: bool + """ + return self._enable_model_observability + + @enable_model_observability.setter + def enable_model_observability(self, enable_model_observability): + """Sets the enable_model_observability of this VersionEndpoint. + + + :param enable_model_observability: The enable_model_observability of this VersionEndpoint. # noqa: E501 + :type: bool + """ + + self._enable_model_observability = enable_model_observability + @property def created_at(self): """Gets the created_at of this VersionEndpoint. # noqa: E501 diff --git a/python/sdk/merlin/client.py b/python/sdk/merlin/client.py index 7c68ae4c2..6fe3e244a 100644 --- a/python/sdk/merlin/client.py +++ b/python/sdk/merlin/client.py @@ -259,6 +259,7 @@ def deploy( deployment_mode: DeploymentMode = None, autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, + enable_model_observability: bool = False ) -> VersionEndpoint: return model_version.deploy( environment_name, @@ -269,6 +270,7 @@ def deploy( deployment_mode, autoscaling_policy, protocol, + enable_model_observability ) def undeploy(self, model_version: ModelVersion, environment_name: str = None): diff --git a/python/sdk/merlin/endpoint.py b/python/sdk/merlin/endpoint.py index 2113e62f5..a523f67d0 100644 --- a/python/sdk/merlin/endpoint.py +++ b/python/sdk/merlin/endpoint.py @@ -77,6 +77,8 @@ def __init__(self, endpoint: client.VersionEndpoint, log_url: str = None): if log_url is not None: self._log_url = log_url + self._enable_model_observability = endpoint.enable_model_observability + @property def url(self): return self._url @@ -132,6 +134,10 @@ def resource_request(self) -> ResourceRequest: @property def transformer(self) -> Transformer: return self._transformer + + @property + def enable_model_observability(self) -> bool: + return self._enable_model_observability def _repr_html_(self): return f"""{self._url}""" diff --git a/python/sdk/merlin/fluent.py b/python/sdk/merlin/fluent.py index 9cef7dc81..cf56fca24 100644 --- a/python/sdk/merlin/fluent.py +++ b/python/sdk/merlin/fluent.py @@ -356,6 +356,7 @@ def deploy( deployment_mode: DeploymentMode = None, autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, + enable_model_observability: bool = False ) -> VersionEndpoint: """ Deploy a model version. @@ -369,6 +370,7 @@ def deploy( :param deployment_mode: mode of deployment for the endpoint (default: DeploymentMode.SERVERLESS) :param autoscaling_policy: autoscaling policy to be used for the deployment (default: None) :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) + :param enable_model_observability: flag to determine whether model observability enabled for the endpoint :return: VersionEndpoint object """ _check_active_client() @@ -383,6 +385,7 @@ def deploy( deployment_mode, autoscaling_policy, protocol, + enable_model_observability ) return _merlin_client.deploy( # type: ignore @@ -395,6 +398,7 @@ def deploy( deployment_mode, autoscaling_policy, protocol, + enable_model_observability ) diff --git a/python/sdk/merlin/model.py b/python/sdk/merlin/model.py index 7e413f573..db12fdb56 100644 --- a/python/sdk/merlin/model.py +++ b/python/sdk/merlin/model.py @@ -290,6 +290,7 @@ class ModelType(Enum): ONNX = "onnx" PYFUNC = "pyfunc" PYFUNC_V2 = "pyfunc_v2" + PYFUNC_V3 = "pyfunc_v3" CUSTOM = "custom" @@ -944,8 +945,9 @@ def log_pyfunc_model( if ( self._model.type != ModelType.PYFUNC and self._model.type != ModelType.PYFUNC_V2 + and self._model.type != ModelType.PYFUNC_V3 ): - raise ValueError("log_pyfunc_model is only for PyFunc and PyFuncV2 model") + raise ValueError("log_pyfunc_model is only for PyFunc, PyFuncV2 and PyFuncV3 model") # add/replace python version in conda to match that used to create model version conda_env = _process_conda_env(conda_env, self._python_version) @@ -985,6 +987,7 @@ def log_model(self, model_dir=None): if ( self._model.type == ModelType.PYFUNC or self._model.type == ModelType.PYFUNC_V2 + or self._model.type == ModelType.PYFUNC_V3 ): raise ValueError("use log_pyfunc_model to log pyfunc model") @@ -1069,6 +1072,7 @@ def deploy( deployment_mode: DeploymentMode = None, autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, + enable_model_observability: bool = False ) -> VersionEndpoint: """ Deploy current model to MLP One of log_model, log_pytorch_model, @@ -1168,6 +1172,7 @@ def deploy( deployment_mode=target_deployment_mode, autoscaling_policy=target_autoscaling_policy, protocol=target_protocol, + enable_model_observability=enable_model_observability ) if current_endpoint is not None: # This allows a serving deployment to be updated while it is serving diff --git a/python/sdk/merlin/validation.py b/python/sdk/merlin/validation.py index 0de387444..4e0359acb 100644 --- a/python/sdk/merlin/validation.py +++ b/python/sdk/merlin/validation.py @@ -32,6 +32,7 @@ def validate_model_dir(model_type, model_dir): if model_type == ModelType.PYFUNC or \ model_type == ModelType.PYFUNC_V2 or \ + model_type == ModelType.PYFUNC_V3 or \ model_type == ModelType.CUSTOM: return diff --git a/python/sdk/test/model_test.py b/python/sdk/test/model_test.py index eb0753c5c..c04de8fce 100644 --- a/python/sdk/test/model_test.py +++ b/python/sdk/test/model_test.py @@ -124,6 +124,18 @@ protocol=cl.Protocol.UPI_V1, ) +observability_enabled_ep = cl.VersionEndpoint( + "7899", + 1, + "running", + "localhost/1", + "svc-1", + env_3.name, + env_3, + "grafana.com", + enable_model_observability=True, +) + rule_1 = cl.ModelEndpointRule( destinations=[cl.ModelEndpointRuleDestination(ep1.id, weight=100)] ) @@ -719,6 +731,55 @@ def test_deploy_with_gpu(self, version): == resource_request_with_gpu.gpu_request ) + @responses.activate + def test_deploy_with_model_observability_enabled(self, version): + responses.add( + "GET", + "/v1/environments", + body=json.dumps([env_3.to_dict()]), + status=200, + content_type="application/json", + ) + # This is the additional check which deploy makes to determine if there are any existing endpoints associated + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint", + body=json.dumps([]), + status=200, + content_type="application/json", + ) + responses.add( + "POST", + "/v1/models/1/versions/1/endpoint", + body=json.dumps(observability_enabled_ep.to_dict()), + status=200, + content_type="application/json", + ) + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint", + body=json.dumps([observability_enabled_ep.to_dict()]), + status=200, + content_type="application/json", + ) + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint/7899", + body=json.dumps(observability_enabled_ep.to_dict()), + status=200, + content_type="application/json", + ) + + endpoint = version.deploy(environment_name=env_3.name, enable_model_observability=True) + + assert endpoint.id == observability_enabled_ep.id + assert endpoint.status.value == observability_enabled_ep.status + assert endpoint.environment_name == observability_enabled_ep.environment_name + assert endpoint.environment.cluster == env_3.cluster + assert endpoint.environment.name == env_3.name + assert endpoint.deployment_mode == DeploymentMode.SERVERLESS + assert endpoint.enable_model_observability == True + @responses.activate def test_undeploy(self, version): responses.add( diff --git a/python/sdk/test/pyfunc_integration_test.py b/python/sdk/test/pyfunc_integration_test.py index 40d2ba616..00a8e2f12 100644 --- a/python/sdk/test/pyfunc_integration_test.py +++ b/python/sdk/test/pyfunc_integration_test.py @@ -22,7 +22,8 @@ import numpy as np import pytest import xgboost as xgb -from merlin.model import ModelType, PyFuncModel +from merlin.model import ModelType, PyFuncModel, PyFuncV3Model +from merlin.pyfunc import ModelInput, ModelOutput, Values from sklearn import svm from sklearn.datasets import load_iris @@ -48,7 +49,6 @@ def infer(self, model_input): result_2 = self._model_2.predict_proba(inputs) return {"predictions": ((result_1 + result_2) / 2).tolist()} - class EnvVarModel(PyFuncModel): def initialize(self, artifacts): self.env_var = {} @@ -58,6 +58,38 @@ def initialize(self, artifacts): def infer(self, model_input): return self.env_var + +class ModelObservabilityModel(PyFuncV3Model): + def initialize(self, artifacts): + self._feature_names = ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'] + self._target_names = ['setosa', 'versicolor', 'virginica'] + self._model = xgb.Booster(model_file=artifacts["xgb_model"]) + + def preprocess(self, request: dict, **kwargs) -> ModelInput: + features_data = request['instances'] + return ModelInput( + prediction_ids=["prediction_1", "prediction_2"], + features=Values( + columns=self._feature_names, + data=features_data + ) + ) + + def infer(self, model_input: ModelInput) -> ModelOutput: + dmatrix = xgb.DMatrix(model_input.features.data) + outputs = self._model.predict(dmatrix).tolist() + return ModelOutput( + prediction_ids=model_input.prediction_ids, + predictions=Values( + columns=self._target_names, + data = outputs + ) + ) + + def postprocess(self, model_output: ModelOutput, request: dict) -> dict: + return { + "predictions": model_output.predictions.data + } @pytest.mark.pyfunc @pytest.mark.integration @@ -119,6 +151,36 @@ def test_pyfunc_env_vars(integration_test_url, project_name, use_google_oauth, r merlin.undeploy(v) +@pytest.mark.pyfunc +@pytest.mark.integration +@pytest.mark.dependency() +def test_pyfunc_model_observability(integration_test_url, project_name, use_google_oauth, requests): + merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) + merlin.set_project(project_name) + merlin.set_model("pyfunc-mlobs", ModelType.PYFUNC_V3) + + undeploy_all_version() + with merlin.new_model_version() as v: + iris = load_iris() + y = iris['target'] + X = iris['data'] + xgb_path = train_xgboost_model(X, y) + + v.log_pyfunc_model(model_instance=ModelObservabilityModel(), + conda_env="test/pyfunc/env.yaml", + code_dir=["test"], + artifacts={"xgb_model": xgb_path}) + + endpoint = merlin.deploy(v, enable_model_observability=True) + + resp = requests.post(f"{endpoint.url}", json=request_json) + + assert resp.status_code == 200 + assert resp.json() is not None + assert len(resp.json()['predictions']) == len(request_json['instances']) + + merlin.undeploy(v) + # This implementation of PyFuncModel uses the old infer method (no keyword arguments). # The keywords arguments for infer() method introduced in Merlin 0.5.2. diff --git a/swagger.yaml b/swagger.yaml index 3d92b364a..1058adb62 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -1123,6 +1123,7 @@ definitions: - "pytorch" - "pyfunc" - "pyfunc_v2" + - "pyfunc_v3" - "custom" mlflow_url: type: "string" @@ -1325,6 +1326,8 @@ definitions: $ref: "#/definitions/AutoscalingPolicy" protocol: $ref: "#/definitions/Protocol" + enable_model_observability: + type: "boolean" created_at: type: "string" format: "date-time"