From a5a3e0d0dc9add4209d2fc9192ca389fd413ae78 Mon Sep 17 00:00:00 2001 From: Tio Pramayudi Date: Fri, 22 Nov 2024 09:52:37 +0700 Subject: [PATCH] Add more granular config for model observability during deployment --- api/api/validator.go | 2 +- api/go.mod | 6 +- api/go.sum | 12 +- api/models/model_observability.go | 57 +++++++ api/models/observability_publisher.go | 10 +- api/models/version_endpoint.go | 15 +- api/models/version_endpoint_test.go | 53 +++++++ .../observability/deployment/deployment.go | 30 +++- .../deployment/deployment_test.go | 146 +++++++++++++++++ api/pkg/observability/event/event.go | 26 ++- api/service/version_endpoint_service.go | 10 ++ api/storage/version_endpoint_storage_test.go | 64 ++++++++ ...bservability_on_version_endpoints.down.sql | 1 + ..._observability_on_version_endpoints.up.sql | 1 + docker-compose.yaml | 2 +- python/sdk/client/__init__.py | 4 + python/sdk/client/models/__init__.py | 4 + python/sdk/client/models/ground_truth_job.py | 103 ++++++++++++ .../sdk/client/models/ground_truth_source.py | 91 +++++++++++ python/sdk/client/models/model.py | 4 +- .../sdk/client/models/model_observability.py | 105 +++++++++++++ ...ediction_log_ingestion_resource_request.py | 91 +++++++++++ python/sdk/client/models/version_endpoint.py | 8 +- python/sdk/client_README.md | 4 + python/sdk/merlin/client.py | 3 + python/sdk/merlin/endpoint.py | 8 + python/sdk/merlin/fluent.py | 3 + python/sdk/merlin/model.py | 8 + python/sdk/merlin/model_observability.py | 148 ++++++++++++++++++ python/sdk/test/model_observability_test.py | 63 ++++++++ python/sdk/test/model_test.py | 87 ++++++++++ swagger.yaml | 68 ++++++++ 32 files changed, 1206 insertions(+), 31 deletions(-) create mode 100644 api/models/model_observability.go create mode 100644 db-migrations/40_model_observability_on_version_endpoints.down.sql create mode 100644 db-migrations/40_model_observability_on_version_endpoints.up.sql create mode 100644 python/sdk/client/models/ground_truth_job.py create mode 100644 python/sdk/client/models/ground_truth_source.py create mode 100644 python/sdk/client/models/model_observability.py create mode 100644 python/sdk/client/models/prediction_log_ingestion_resource_request.py create mode 100644 python/sdk/merlin/model_observability.py create mode 100644 python/sdk/test/model_observability_test.py diff --git a/api/api/validator.go b/api/api/validator.go index 1264d0e7a..f75bcd8b8 100644 --- a/api/api/validator.go +++ b/api/api/validator.go @@ -176,7 +176,7 @@ func deploymentModeValidation(prev *models.VersionEndpoint, new *models.VersionE func modelObservabilityValidation(endpoint *models.VersionEndpoint, model *models.Model) requestValidator { return newFuncValidate(func() error { - if endpoint.EnableModelObservability && !slices.Contains(supportedObservabilityModelTypes, model.Type) { + if endpoint.IsModelMonitoringEnabled() && !slices.Contains(supportedObservabilityModelTypes, model.Type) { return fmt.Errorf("%s: %w", model.Type, ErrUnsupportedObservabilityModelType) } return nil diff --git a/api/go.mod b/api/go.mod index 1f9c67111..e146c72e8 100644 --- a/api/go.mod +++ b/api/go.mod @@ -20,7 +20,7 @@ require ( github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/confluentinc/confluent-kafka-go/v2 v2.0.2 github.com/coocood/freecache v1.2.3 - github.com/fatih/color v1.15.0 + github.com/fatih/color v1.18.0 github.com/feast-dev/feast/sdk/go v0.9.4 github.com/fraugster/parquet-go v0.10.0 github.com/ghodss/yaml v1.0.0 @@ -183,7 +183,7 @@ require ( github.com/magiconair/properties v1.8.5 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -229,7 +229,7 @@ require ( golang.org/x/net v0.22.0 // indirect golang.org/x/oauth2 v0.18.0 // indirect golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/tools v0.19.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect diff --git a/api/go.sum b/api/go.sum index f608b5513..7221dcba0 100644 --- a/api/go.sum +++ b/api/go.sum @@ -280,8 +280,8 @@ github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= -github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= -github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/feast-dev/feast/sdk/go v0.9.4 h1:ChGdqbNiaBbcm/X1TRBkikAnbhJFAp0ofzs1TmBA/M4= github.com/feast-dev/feast/sdk/go v0.9.4/go.mod h1:RWG8U+ri5d9CEE6jGPwfaIr5TxNGisZvlCMwGYfmGn4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -759,8 +759,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= @@ -1365,8 +1365,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/api/models/model_observability.go b/api/models/model_observability.go new file mode 100644 index 000000000..6a5ae0862 --- /dev/null +++ b/api/models/model_observability.go @@ -0,0 +1,57 @@ +package models + +import ( + "database/sql/driver" + "encoding/json" + "errors" + + "k8s.io/apimachinery/pkg/api/resource" +) + +// ModelObservability represents the configuration for model observability features. +type ModelObservability struct { + Enabled bool `json:"enabled"` + GroundTruthSource *GroundTruthSource `json:"ground_truth_source"` + GroundTruthJob *GroundTruthJob `json:"ground_truth_job"` + PredictionLogIngestionResourceRequest *WorkerResourceRequest `json:"prediction_log_ingestion_resource_request"` +} + +// GroundTruthSource represents the source configuration for ground truth data. +type GroundTruthSource struct { + TableURN string `json:"table_urn"` + EventTimestampColumn string `json:"event_timestamp_column"` + DWHProject string `json:"dwh_project"` +} + +// GroundTruthJob represents the configuration for a scheduled job. +type GroundTruthJob struct { + CronSchedule string `json:"cron_schedule"` + CPURequest string `json:"cpu_request"` + CPULimit *string `json:"cpu_limit"` + MemoryRequest string `json:"memory_request"` + MemoryLimit *string `json:"memory_limit"` + StartDayOffsetFromNow int `json:"start_day_offset_from_now"` + EndDayOffsetFromNow int `json:"end_day_offset_from_now"` + GracePeriodDay int `json:"grace_period_day"` + ServiceAccountSecretName string `json:"service_account_secret_name"` +} + +// WorkerResourceRequest represents the resource request for a worker (prediction log ingestion kafka consumer) deployment. +type WorkerResourceRequest struct { + CPURequest *resource.Quantity `json:"cpu_request"` + MemoryRequest *resource.Quantity `json:"memory_request"` + Replica int32 `json:"replica"` +} + +func (mlob ModelObservability) Value() (driver.Value, error) { + return json.Marshal(mlob) +} + +func (mlob *ModelObservability) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return json.Unmarshal(b, &mlob) +} diff --git a/api/models/observability_publisher.go b/api/models/observability_publisher.go index f1878f7ee..54dcd92b4 100644 --- a/api/models/observability_publisher.go +++ b/api/models/observability_publisher.go @@ -40,9 +40,10 @@ type WorkerData struct { ModelVersion string Revision int TopicSource string + ResourceRequest *WorkerResourceRequest } -func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher) *WorkerData { +func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher *ObservabilityPublisher, resourceRequest *WorkerResourceRequest) *WorkerData { return &WorkerData{ ModelName: model.Name, Project: model.Project.Name, @@ -54,9 +55,10 @@ func NewWorkerData(modelVersion *Version, model *Model, observabilityPublisher * Team: model.Project.Team, Labels: model.Project.Labels, }, - ModelVersion: modelVersion.ID.String(), - Revision: observabilityPublisher.Revision, - TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()), + ModelVersion: modelVersion.ID.String(), + Revision: observabilityPublisher.Revision, + TopicSource: getPredictionLogTopicForVersion(model.Project.Name, model.Name, modelVersion.ID.String()), + ResourceRequest: resourceRequest, } } diff --git a/api/models/version_endpoint.go b/api/models/version_endpoint.go index a7ae549a9..7b2dc0fb2 100644 --- a/api/models/version_endpoint.go +++ b/api/models/version_endpoint.go @@ -72,8 +72,12 @@ type VersionEndpoint struct { // AutoscalingPolicy controls the conditions when autoscaling should be triggered AutoscalingPolicy *autoscaling.AutoscalingPolicy `json:"autoscaling_policy" gorm:"autoscaling_policy"` // Protocol to be used when deploying the model - Protocol protocol.Protocol `json:"protocol" gorm:"protocol"` - EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"` + Protocol protocol.Protocol `json:"protocol" gorm:"protocol"` + // EnableModelObservability flag indicate whether the version endpoint should enable model observability + // This flag will be deprecated in the future, please use ModelObservability.Enabled instead + EnableModelObservability bool `json:"enable_model_observability" gorm:"enable_model_observability"` + // ModelObservability configuration + ModelObservability *ModelObservability `json:"model_observability" gorm:"model_observability"` CreatedUpdated } @@ -127,6 +131,13 @@ func (ve *VersionEndpoint) IsServing() bool { return ve.Status == EndpointServing } +func (ve *VersionEndpoint) IsModelMonitoringEnabled() bool { + if ve.ModelObservability == nil { + return ve.EnableModelObservability + } + return ve.ModelObservability.Enabled +} + func (ve *VersionEndpoint) Hostname() string { if ve.URL == "" { return "" diff --git a/api/models/version_endpoint_test.go b/api/models/version_endpoint_test.go index 89a1cac2e..35a9776e7 100644 --- a/api/models/version_endpoint_test.go +++ b/api/models/version_endpoint_test.go @@ -236,3 +236,56 @@ func TestVersionEndpoint_Path(t *testing.T) { }) } } + +func TestVersionEndpoint_IsModelMonitoringEnabled(t *testing.T) { + tests := []struct { + name string + versionEndpoint *VersionEndpoint + want bool + }{ + { + name: "model observability is nil but enable model observability is true", + versionEndpoint: &VersionEndpoint{ + ModelObservability: nil, + EnableModelObservability: true, + }, + want: true, + }, + { + name: "model observability is nil and enable model observability is false", + versionEndpoint: &VersionEndpoint{ + ModelObservability: nil, + EnableModelObservability: false, + }, + want: false, + }, + { + name: "model observability is not nil and enabled is true", + versionEndpoint: &VersionEndpoint{ + ModelObservability: &ModelObservability{ + Enabled: true, + }, + EnableModelObservability: true, + }, + want: true, + }, + { + name: "model observability is not nil and enabled is false", + versionEndpoint: &VersionEndpoint{ + ModelObservability: &ModelObservability{ + Enabled: false, + }, + EnableModelObservability: false, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + if got := tt.versionEndpoint.IsModelMonitoringEnabled(); got != tt.want { + t.Errorf("VersionEndpoint.IsModelMonitoringEnabled() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/api/pkg/observability/deployment/deployment.go b/api/pkg/observability/deployment/deployment.go index a4bcf6f42..86bdaeca3 100644 --- a/api/pkg/observability/deployment/deployment.go +++ b/api/pkg/observability/deployment/deployment.go @@ -342,11 +342,30 @@ func (c *deployer) getLabels(data *models.WorkerData) map[string]string { return labels } +func (c *deployer) getResources(data *models.WorkerData) (corev1.ResourceList, corev1.ResourceList) { + requests := c.resourceRequest.DeepCopy() + limits := c.resourceLimit.DeepCopy() + if data.ResourceRequest != nil { + if data.ResourceRequest.CPURequest != nil { + requests[corev1.ResourceCPU] = *data.ResourceRequest.CPURequest + // remove default limits + delete(limits, corev1.ResourceCPU) + } + if memoryReq := data.ResourceRequest.MemoryRequest; memoryReq != nil { + requests[corev1.ResourceMemory] = *memoryReq + limits[corev1.ResourceMemory] = *memoryReq + } + } + return requests, limits +} + func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName string) (*appsv1.Deployment, error) { labels := c.getLabels(data) cfgVolName := "config-volume" workerContainer := "worker" + + requestsResources, limitsResources := c.getResources(data) podSpec := corev1.PodSpec{ Containers: []corev1.Container{ { @@ -361,8 +380,8 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri ImagePullPolicy: corev1.PullIfNotPresent, Resources: corev1.ResourceRequirements{ - Requests: c.resourceRequest, - Limits: c.resourceLimit, + Requests: requestsResources, + Limits: limitsResources, }, VolumeMounts: []corev1.VolumeMount{ { @@ -392,6 +411,11 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri }, } podSpecWithIdentity := enrichIdentityToPod(podSpec, c.consumerConfig.ServiceAccountSecretName, []string{workerContainer}) + numReplicas := c.consumerConfig.Replicas + if data.ResourceRequest != nil && data.ResourceRequest.Replica > 0 { + numReplicas = data.ResourceRequest.Replica + } + return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: c.getDeploymentName(data), @@ -407,7 +431,7 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri appLabelKey: data.Metadata.App, }, }, - Replicas: &c.consumerConfig.Replicas, + Replicas: &numReplicas, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, diff --git a/api/pkg/observability/deployment/deployment_test.go b/api/pkg/observability/deployment/deployment_test.go index cba990d60..142e77bad 100644 --- a/api/pkg/observability/deployment/deployment_test.go +++ b/api/pkg/observability/deployment/deployment_test.go @@ -947,6 +947,152 @@ func Test_deployer_GetDeployedManifest(t *testing.T) { } } +func toQuantityPointer(quantity resource.Quantity) *resource.Quantity { + return &quantity +} + +func Test_deployer_getResources(t *testing.T) { + testCases := []struct { + name string + deployer *deployer + data *models.WorkerData + expectedRequestResources corev1.ResourceList + expectedLimitResources corev1.ResourceList + }{ + { + name: "worker data doesn't have resource request and limit hence using default", + deployer: &deployer{ + resourceRequest: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + resourceLimit: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + data: &models.WorkerData{}, + expectedRequestResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + expectedLimitResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + { + name: "worker data have cpu and memory resource request", + deployer: &deployer{ + resourceRequest: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + resourceLimit: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + data: &models.WorkerData{ + ResourceRequest: &models.WorkerResourceRequest{ + CPURequest: toQuantityPointer(resource.MustParse("2")), + MemoryRequest: toQuantityPointer(resource.MustParse("2Gi")), + }, + }, + expectedRequestResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + expectedLimitResources: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + { + name: "worker data have cpu and memory resource request, default doesn't have cpu limit", + deployer: &deployer{ + resourceRequest: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + resourceLimit: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + data: &models.WorkerData{ + ResourceRequest: &models.WorkerResourceRequest{ + CPURequest: toQuantityPointer(resource.MustParse("2")), + MemoryRequest: toQuantityPointer(resource.MustParse("2Gi")), + }, + }, + expectedRequestResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + expectedLimitResources: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + { + name: "worker data have cpu resource request but not memory", + deployer: &deployer{ + resourceRequest: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + resourceLimit: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + data: &models.WorkerData{ + ResourceRequest: &models.WorkerResourceRequest{ + CPURequest: toQuantityPointer(resource.MustParse("2")), + }, + }, + expectedRequestResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + expectedLimitResources: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + { + name: "worker data have memory resource request but not cpu", + deployer: &deployer{ + resourceRequest: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + resourceLimit: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + data: &models.WorkerData{ + ResourceRequest: &models.WorkerResourceRequest{ + MemoryRequest: toQuantityPointer(resource.MustParse("2Gi")), + }, + }, + expectedRequestResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + expectedLimitResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + } + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + requestResource, limitResource := tC.deployer.getResources(tC.data) + assert.Equal(t, tC.expectedRequestResources, requestResource) + assert.Equal(t, tC.expectedLimitResources, limitResource) + }) + } +} + func prependGetSecretReactor(t *testing.T, secretAPI *fakecorev1.FakeSecrets, secretRet *corev1.Secret, expectedErr error) { secretAPI.Fake.PrependReactor(getMethod, secretResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { actualAction, ok := action.(ktesting.GetAction) diff --git a/api/pkg/observability/event/event.go b/api/pkg/observability/event/event.go index 3a6a38ac2..eab519f5d 100644 --- a/api/pkg/observability/event/event.go +++ b/api/pkg/observability/event/event.go @@ -65,7 +65,7 @@ func (e *eventProducer) ModelEndpointChangeEvent(modelEndpoint *models.ModelEndp return err } - return e.enqueueJob(version, model, publisher, models.UndeployPublisher) + return e.enqueueJob(version, model, publisher, models.UndeployPublisher, nil) } versionEndpoint := modelEndpoint.GetVersionEndpoint() @@ -84,7 +84,12 @@ func (e *eventProducer) ModelEndpointChangeEvent(modelEndpoint *models.ModelEndp publisher.VersionID = versionEndpoint.VersionID publisher.ModelSchemaSpec = version.ModelSchema.Spec - return e.enqueueJob(version, model, publisher, models.DeployPublisher) + var workerResourceRequest *models.WorkerResourceRequest + if versionEndpoint.ModelObservability != nil && versionEndpoint.ModelObservability.PredictionLogIngestionResourceRequest != nil { + workerResourceRequest = versionEndpoint.ModelObservability.PredictionLogIngestionResourceRequest + } + + return e.enqueueJob(version, model, publisher, models.DeployPublisher, workerResourceRequest) } func (e *eventProducer) VersionEndpointChangeEvent(versionEndpoint *models.VersionEndpoint, model *models.Model) error { @@ -105,7 +110,7 @@ func (e *eventProducer) VersionEndpointChangeEvent(versionEndpoint *models.Versi } // Undeploy if version endpoint observability is false - if !versionEndpoint.EnableModelObservability { + if !versionEndpoint.IsModelMonitoringEnabled() { if publisher == nil || publisher.Status == models.Terminated { return nil } @@ -113,7 +118,7 @@ func (e *eventProducer) VersionEndpointChangeEvent(versionEndpoint *models.Versi if err != nil { return err } - return e.enqueueJob(version, model, publisher, models.UndeployPublisher) + return e.enqueueJob(version, model, publisher, models.UndeployPublisher, nil) } version, err := e.findVersionWithModelSchema(ctx, versionEndpoint.VersionID, model.ID) @@ -130,7 +135,12 @@ func (e *eventProducer) VersionEndpointChangeEvent(versionEndpoint *models.Versi publisher.VersionID = versionEndpoint.VersionID publisher.ModelSchemaSpec = version.ModelSchema.Spec - return e.enqueueJob(version, model, publisher, models.DeployPublisher) + + var workerResourceRequest *models.WorkerResourceRequest + if versionEndpoint.ModelObservability != nil && versionEndpoint.ModelObservability.PredictionLogIngestionResourceRequest != nil { + workerResourceRequest = versionEndpoint.ModelObservability.PredictionLogIngestionResourceRequest + } + return e.enqueueJob(version, model, publisher, models.DeployPublisher, workerResourceRequest) } func isUndeployAction(modelEndpoint *models.ModelEndpoint) bool { @@ -141,7 +151,7 @@ func isUndeployAction(modelEndpoint *models.ModelEndpoint) bool { return false } destination := modelEndpoint.Rule.Destination[0] - return !destination.VersionEndpoint.EnableModelObservability + return !destination.VersionEndpoint.IsModelMonitoringEnabled() } func (e *eventProducer) findVersionWithModelSchema(ctx context.Context, versionID models.ID, modelID models.ID) (*models.Version, error) { @@ -155,7 +165,7 @@ func (e *eventProducer) findVersionWithModelSchema(ctx context.Context, versionI return version, nil } -func (e *eventProducer) enqueueJob(version *models.Version, model *models.Model, publisher *models.ObservabilityPublisher, actionType models.ActionType) error { +func (e *eventProducer) enqueueJob(version *models.Version, model *models.Model, publisher *models.ObservabilityPublisher, actionType models.ActionType, workerResourceRequest *models.WorkerResourceRequest) error { publisher.Status = models.Pending if version.ModelSchema != nil { publisher.ModelSchemaSpec = version.ModelSchema.Spec @@ -182,7 +192,7 @@ func (e *eventProducer) enqueueJob(version *models.Version, model *models.Model, dataArgKey: models.ObservabilityPublisherJob{ ActionType: actionType, Publisher: publisher, - WorkerData: models.NewWorkerData(version, model, publisher), + WorkerData: models.NewWorkerData(version, model, publisher, workerResourceRequest), }, }, }) diff --git a/api/service/version_endpoint_service.go b/api/service/version_endpoint_service.go index eee58a608..73429d240 100644 --- a/api/service/version_endpoint_service.go +++ b/api/service/version_endpoint_service.go @@ -257,6 +257,16 @@ func (k *endpointService) override(left *models.VersionEndpoint, right *models.V } left.EnableModelObservability = right.EnableModelObservability && model.ObservabilitySupported + if right.ModelObservability != nil { + left.ModelObservability = right.ModelObservability + left.ModelObservability.Enabled = right.ModelObservability.Enabled && model.ObservabilitySupported + } + // for older sdk + if left.EnableModelObservability && right.ModelObservability == nil { + left.ModelObservability = &models.ModelObservability{ + Enabled: true, + } + } return nil } diff --git a/api/storage/version_endpoint_storage_test.go b/api/storage/version_endpoint_storage_test.go index 79d19d74e..bb663ba55 100644 --- a/api/storage/version_endpoint_storage_test.go +++ b/api/storage/version_endpoint_storage_test.go @@ -91,6 +91,46 @@ func TestVersionEndpointsStorage_GetTransformer(t *testing.T) { }) } +func TestVersionEndpointsStorage_GetModelObservability(t *testing.T) { + database.WithTestDatabase(t, func(t *testing.T, db *gorm.DB) { + endpoints := populateVersionEndpointTable(db) + endpointSvc := NewVersionEndpointStorage(db) + + actualEndpoint, err := endpointSvc.Get(endpoints[2].ID) + + assert.NoError(t, err) + assert.NotNil(t, actualEndpoint) + modelObservability := actualEndpoint.ModelObservability + assert.NotNil(t, modelObservability) + assert.Equal(t, true, modelObservability.Enabled) + expectedGroundTruthSource := &models.GroundTruthSource{ + TableURN: "table_urn", + EventTimestampColumn: "event_timestamp", + DWHProject: "dwh_project", + } + assert.Equal(t, expectedGroundTruthSource, modelObservability.GroundTruthSource) + expectedGroundTruthob := &models.GroundTruthJob{ + CronSchedule: "0 0 * * *", + CPURequest: "1", + CPULimit: nil, + MemoryRequest: "1Gi", + MemoryLimit: nil, + StartDayOffsetFromNow: 2, + EndDayOffsetFromNow: 1, + GracePeriodDay: 3, + ServiceAccountSecretName: "service_account_secret_name", + } + assert.Equal(t, expectedGroundTruthob, modelObservability.GroundTruthJob) + + expectedPredictionLogIngestionResourceRequest := &models.WorkerResourceRequest{ + CPURequest: "1", + MemoryRequest: "1Gi", + Replica: 1, + } + assert.Equal(t, expectedPredictionLogIngestionResourceRequest, modelObservability.PredictionLogIngestionResourceRequest) + }) +} + func TestVersionEndpointsStorage_Save(t *testing.T) { database.WithTestDatabase(t, func(t *testing.T, db *gorm.DB) { endpoints := populateVersionEndpointTable(db) @@ -184,6 +224,30 @@ func populateVersionEndpointTable(db *gorm.DB) []*models.VersionEndpoint { Image: "ghcr.io/caraml-dev/merlin-transformer-test", }, DeploymentMode: deployment.ServerlessDeploymentMode, + ModelObservability: &models.ModelObservability{ + Enabled: true, + GroundTruthSource: &models.GroundTruthSource{ + TableURN: "table_urn", + EventTimestampColumn: "event_timestamp", + DWHProject: "dwh_project", + }, + GroundTruthJob: &models.GroundTruthJob{ + CronSchedule: "0 0 * * *", + CPURequest: "1", + CPULimit: nil, + MemoryRequest: "1Gi", + MemoryLimit: nil, + StartDayOffsetFromNow: 2, + EndDayOffsetFromNow: 1, + GracePeriodDay: 3, + ServiceAccountSecretName: "service_account_secret_name", + }, + PredictionLogIngestionResourceRequest: &models.WorkerResourceRequest{ + CPURequest: "1", + MemoryRequest: "1Gi", + Replica: 1, + }, + }, } db.Create(&ep3) return []*models.VersionEndpoint{&ep1, &ep2, &ep3} diff --git a/db-migrations/40_model_observability_on_version_endpoints.down.sql b/db-migrations/40_model_observability_on_version_endpoints.down.sql new file mode 100644 index 000000000..9956e3c7f --- /dev/null +++ b/db-migrations/40_model_observability_on_version_endpoints.down.sql @@ -0,0 +1 @@ +ALTER TABLE version_endpoints DROP COLUMN model_observability; diff --git a/db-migrations/40_model_observability_on_version_endpoints.up.sql b/db-migrations/40_model_observability_on_version_endpoints.up.sql new file mode 100644 index 000000000..f6305acb2 --- /dev/null +++ b/db-migrations/40_model_observability_on_version_endpoints.up.sql @@ -0,0 +1 @@ +ALTER TABLE version_endpoints ADD COLUMN model_observability jsonb; \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index dbebe54fd..627bb5195 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -version: '3' services: postgres: image: bitnami/postgresql:latest @@ -22,6 +21,7 @@ services: - POSTGRESQL_USERNAME=merlin - POSTGRESQL_PASSWORD=merlin - POSTGRESQL_DATABASE=merlin + - POSTGRESQL_REPLICATION_USE_PASSFILE=false migrations: depends_on: diff --git a/python/sdk/client/__init__.py b/python/sdk/client/__init__.py index 2487f4b34..a0920ea1f 100644 --- a/python/sdk/client/__init__.py +++ b/python/sdk/client/__init__.py @@ -58,6 +58,8 @@ from client.models.file_format import FileFormat from client.models.gpu_config import GPUConfig from client.models.gpu_toleration import GPUToleration +from client.models.ground_truth_job import GroundTruthJob +from client.models.ground_truth_source import GroundTruthSource from client.models.image_building_job_state import ImageBuildingJobState from client.models.image_building_job_status import ImageBuildingJobStatus from client.models.label import Label @@ -73,6 +75,7 @@ from client.models.model_endpoint_alert_condition import ModelEndpointAlertCondition from client.models.model_endpoint_rule import ModelEndpointRule from client.models.model_endpoint_rule_destination import ModelEndpointRuleDestination +from client.models.model_observability import ModelObservability from client.models.model_prediction_config import ModelPredictionConfig from client.models.model_prediction_output import ModelPredictionOutput from client.models.model_prediction_output_class import ModelPredictionOutputClass @@ -89,6 +92,7 @@ from client.models.prediction_job_config_model import PredictionJobConfigModel from client.models.prediction_job_config_model_result import PredictionJobConfigModelResult from client.models.prediction_job_resource_request import PredictionJobResourceRequest +from client.models.prediction_log_ingestion_resource_request import PredictionLogIngestionResourceRequest from client.models.prediction_logger_config import PredictionLoggerConfig from client.models.project import Project from client.models.protocol import Protocol diff --git a/python/sdk/client/models/__init__.py b/python/sdk/client/models/__init__.py index 7f387a5bd..689a356b5 100644 --- a/python/sdk/client/models/__init__.py +++ b/python/sdk/client/models/__init__.py @@ -29,6 +29,8 @@ from client.models.file_format import FileFormat from client.models.gpu_config import GPUConfig from client.models.gpu_toleration import GPUToleration +from client.models.ground_truth_job import GroundTruthJob +from client.models.ground_truth_source import GroundTruthSource from client.models.image_building_job_state import ImageBuildingJobState from client.models.image_building_job_status import ImageBuildingJobStatus from client.models.label import Label @@ -44,6 +46,7 @@ from client.models.model_endpoint_alert_condition import ModelEndpointAlertCondition from client.models.model_endpoint_rule import ModelEndpointRule from client.models.model_endpoint_rule_destination import ModelEndpointRuleDestination +from client.models.model_observability import ModelObservability from client.models.model_prediction_config import ModelPredictionConfig from client.models.model_prediction_output import ModelPredictionOutput from client.models.model_prediction_output_class import ModelPredictionOutputClass @@ -60,6 +63,7 @@ from client.models.prediction_job_config_model import PredictionJobConfigModel from client.models.prediction_job_config_model_result import PredictionJobConfigModelResult from client.models.prediction_job_resource_request import PredictionJobResourceRequest +from client.models.prediction_log_ingestion_resource_request import PredictionLogIngestionResourceRequest from client.models.prediction_logger_config import PredictionLoggerConfig from client.models.project import Project from client.models.protocol import Protocol diff --git a/python/sdk/client/models/ground_truth_job.py b/python/sdk/client/models/ground_truth_job.py new file mode 100644 index 000000000..0859810a7 --- /dev/null +++ b/python/sdk/client/models/ground_truth_job.py @@ -0,0 +1,103 @@ +# coding: utf-8 + +""" + Merlin + + API Guide for accessing Merlin's model management, deployment, and serving functionalities + + The version of the OpenAPI document: 0.14.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + + +from typing import Any, ClassVar, Dict, List, Optional +from pydantic import BaseModel, StrictInt, StrictStr +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +class GroundTruthJob(BaseModel): + """ + GroundTruthJob + """ # noqa: E501 + cron_schedule: StrictStr + cpu_request: Optional[StrictStr] = None + cpu_limit: Optional[StrictStr] = None + memory_request: Optional[StrictStr] = None + memory_limit: Optional[StrictStr] = None + start_day_offset_from_now: StrictInt + end_day_offset_from_now: StrictInt + grace_period_day: Optional[StrictInt] = None + service_account_secret_name: StrictStr + __properties: ClassVar[List[str]] = ["cron_schedule", "cpu_request", "cpu_limit", "memory_request", "memory_limit", "start_day_offset_from_now", "end_day_offset_from_now", "grace_period_day", "service_account_secret_name"] + + model_config = { + "populate_by_name": True, + "validate_assignment": True + } + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Self: + """Create an instance of GroundTruthJob from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + _dict = self.model_dump( + by_alias=True, + exclude={ + }, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Dict) -> Self: + """Create an instance of GroundTruthJob from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "cron_schedule": obj.get("cron_schedule"), + "cpu_request": obj.get("cpu_request"), + "cpu_limit": obj.get("cpu_limit"), + "memory_request": obj.get("memory_request"), + "memory_limit": obj.get("memory_limit"), + "start_day_offset_from_now": obj.get("start_day_offset_from_now"), + "end_day_offset_from_now": obj.get("end_day_offset_from_now"), + "grace_period_day": obj.get("grace_period_day"), + "service_account_secret_name": obj.get("service_account_secret_name") + }) + return _obj + + diff --git a/python/sdk/client/models/ground_truth_source.py b/python/sdk/client/models/ground_truth_source.py new file mode 100644 index 000000000..0d7807f5a --- /dev/null +++ b/python/sdk/client/models/ground_truth_source.py @@ -0,0 +1,91 @@ +# coding: utf-8 + +""" + Merlin + + API Guide for accessing Merlin's model management, deployment, and serving functionalities + + The version of the OpenAPI document: 0.14.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + + +from typing import Any, ClassVar, Dict, List +from pydantic import BaseModel, StrictStr +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +class GroundTruthSource(BaseModel): + """ + GroundTruthSource + """ # noqa: E501 + table_urn: StrictStr + event_timestamp_column: StrictStr + dwh_project: StrictStr + __properties: ClassVar[List[str]] = ["table_urn", "event_timestamp_column", "dwh_project"] + + model_config = { + "populate_by_name": True, + "validate_assignment": True + } + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Self: + """Create an instance of GroundTruthSource from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + _dict = self.model_dump( + by_alias=True, + exclude={ + }, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Dict) -> Self: + """Create an instance of GroundTruthSource from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "table_urn": obj.get("table_urn"), + "event_timestamp_column": obj.get("event_timestamp_column"), + "dwh_project": obj.get("dwh_project") + }) + return _obj + + diff --git a/python/sdk/client/models/model.py b/python/sdk/client/models/model.py index fee675bf3..0f41fbeb6 100644 --- a/python/sdk/client/models/model.py +++ b/python/sdk/client/models/model.py @@ -47,8 +47,8 @@ def type_validate_enum(cls, value): if value is None: return value - if value not in ('xgboost', 'tensorflow', 'sklearn', 'pytorch', 'pyfunc', 'pyfunc_v2', 'pyfunc_v3', 'custom'): - raise ValueError("must be one of enum values ('xgboost', 'tensorflow', 'sklearn', 'pytorch', 'pyfunc', 'pyfunc_v2', 'pyfunc_v3', 'custom')") + if value not in ('xgboost', 'tensorflow', 'sklearn', 'pytorch', 'pyfunc', 'pyfunc_v2', 'custom'): + raise ValueError("must be one of enum values ('xgboost', 'tensorflow', 'sklearn', 'pytorch', 'pyfunc', 'pyfunc_v2', 'custom')") return value model_config = { diff --git a/python/sdk/client/models/model_observability.py b/python/sdk/client/models/model_observability.py new file mode 100644 index 000000000..12588f956 --- /dev/null +++ b/python/sdk/client/models/model_observability.py @@ -0,0 +1,105 @@ +# coding: utf-8 + +""" + Merlin + + API Guide for accessing Merlin's model management, deployment, and serving functionalities + + The version of the OpenAPI document: 0.14.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + + +from typing import Any, ClassVar, Dict, List, Optional +from pydantic import BaseModel, StrictBool +from client.models.ground_truth_job import GroundTruthJob +from client.models.ground_truth_source import GroundTruthSource +from client.models.prediction_log_ingestion_resource_request import PredictionLogIngestionResourceRequest +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +class ModelObservability(BaseModel): + """ + ModelObservability + """ # noqa: E501 + enabled: StrictBool + ground_truth_source: Optional[GroundTruthSource] = None + ground_truth_job: Optional[GroundTruthJob] = None + prediction_log_ingestion_resource_request: Optional[PredictionLogIngestionResourceRequest] = None + __properties: ClassVar[List[str]] = ["enabled", "ground_truth_source", "ground_truth_job", "prediction_log_ingestion_resource_request"] + + model_config = { + "populate_by_name": True, + "validate_assignment": True + } + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Self: + """Create an instance of ModelObservability from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + _dict = self.model_dump( + by_alias=True, + exclude={ + }, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of ground_truth_source + if self.ground_truth_source: + _dict['ground_truth_source'] = self.ground_truth_source.to_dict() + # override the default output from pydantic by calling `to_dict()` of ground_truth_job + if self.ground_truth_job: + _dict['ground_truth_job'] = self.ground_truth_job.to_dict() + # override the default output from pydantic by calling `to_dict()` of prediction_log_ingestion_resource_request + if self.prediction_log_ingestion_resource_request: + _dict['prediction_log_ingestion_resource_request'] = self.prediction_log_ingestion_resource_request.to_dict() + return _dict + + @classmethod + def from_dict(cls, obj: Dict) -> Self: + """Create an instance of ModelObservability from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "enabled": obj.get("enabled"), + "ground_truth_source": GroundTruthSource.from_dict(obj.get("ground_truth_source")) if obj.get("ground_truth_source") is not None else None, + "ground_truth_job": GroundTruthJob.from_dict(obj.get("ground_truth_job")) if obj.get("ground_truth_job") is not None else None, + "prediction_log_ingestion_resource_request": PredictionLogIngestionResourceRequest.from_dict(obj.get("prediction_log_ingestion_resource_request")) if obj.get("prediction_log_ingestion_resource_request") is not None else None + }) + return _obj + + diff --git a/python/sdk/client/models/prediction_log_ingestion_resource_request.py b/python/sdk/client/models/prediction_log_ingestion_resource_request.py new file mode 100644 index 000000000..ba49d2744 --- /dev/null +++ b/python/sdk/client/models/prediction_log_ingestion_resource_request.py @@ -0,0 +1,91 @@ +# coding: utf-8 + +""" + Merlin + + API Guide for accessing Merlin's model management, deployment, and serving functionalities + + The version of the OpenAPI document: 0.14.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + + +from typing import Any, ClassVar, Dict, List, Optional +from pydantic import BaseModel, StrictInt, StrictStr +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +class PredictionLogIngestionResourceRequest(BaseModel): + """ + PredictionLogIngestionResourceRequest + """ # noqa: E501 + cpu_request: Optional[StrictStr] = None + memory_request: Optional[StrictStr] = None + replica: StrictInt + __properties: ClassVar[List[str]] = ["cpu_request", "memory_request", "replica"] + + model_config = { + "populate_by_name": True, + "validate_assignment": True + } + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Self: + """Create an instance of PredictionLogIngestionResourceRequest from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + _dict = self.model_dump( + by_alias=True, + exclude={ + }, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Dict) -> Self: + """Create an instance of PredictionLogIngestionResourceRequest from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "cpu_request": obj.get("cpu_request"), + "memory_request": obj.get("memory_request"), + "replica": obj.get("replica") + }) + return _obj + + diff --git a/python/sdk/client/models/version_endpoint.py b/python/sdk/client/models/version_endpoint.py index bb3d35f4f..ad8ddf9ac 100644 --- a/python/sdk/client/models/version_endpoint.py +++ b/python/sdk/client/models/version_endpoint.py @@ -26,6 +26,7 @@ from client.models.env_var import EnvVar from client.models.environment import Environment from client.models.logger import Logger +from client.models.model_observability import ModelObservability from client.models.protocol import Protocol from client.models.resource_request import ResourceRequest from client.models.transformer import Transformer @@ -56,9 +57,10 @@ class VersionEndpoint(BaseModel): autoscaling_policy: Optional[AutoscalingPolicy] = None protocol: Optional[Protocol] = None enable_model_observability: Optional[StrictBool] = None + model_observability: Optional[ModelObservability] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None - __properties: ClassVar[List[str]] = ["id", "version_id", "status", "url", "service_name", "environment_name", "environment", "monitoring_url", "message", "resource_request", "image_builder_resource_request", "env_vars", "transformer", "logger", "deployment_mode", "autoscaling_policy", "protocol", "enable_model_observability", "created_at", "updated_at"] + __properties: ClassVar[List[str]] = ["id", "version_id", "status", "url", "service_name", "environment_name", "environment", "monitoring_url", "message", "resource_request", "image_builder_resource_request", "env_vars", "transformer", "logger", "deployment_mode", "autoscaling_policy", "protocol", "enable_model_observability", "model_observability", "created_at", "updated_at"] model_config = { "populate_by_name": True, @@ -121,6 +123,9 @@ def to_dict(self) -> Dict[str, Any]: # override the default output from pydantic by calling `to_dict()` of autoscaling_policy if self.autoscaling_policy: _dict['autoscaling_policy'] = self.autoscaling_policy.to_dict() + # override the default output from pydantic by calling `to_dict()` of model_observability + if self.model_observability: + _dict['model_observability'] = self.model_observability.to_dict() return _dict @classmethod @@ -151,6 +156,7 @@ def from_dict(cls, obj: Dict) -> Self: "autoscaling_policy": AutoscalingPolicy.from_dict(obj.get("autoscaling_policy")) if obj.get("autoscaling_policy") is not None else None, "protocol": obj.get("protocol"), "enable_model_observability": obj.get("enable_model_observability"), + "model_observability": ModelObservability.from_dict(obj.get("model_observability")) if obj.get("model_observability") is not None else None, "created_at": obj.get("created_at"), "updated_at": obj.get("updated_at") }) diff --git a/python/sdk/client_README.md b/python/sdk/client_README.md index e051f0ea3..a11fa150c 100644 --- a/python/sdk/client_README.md +++ b/python/sdk/client_README.md @@ -147,6 +147,8 @@ Class | Method | HTTP request | Description - [FileFormat](client/docs/FileFormat.md) - [GPUConfig](client/docs/GPUConfig.md) - [GPUToleration](client/docs/GPUToleration.md) + - [GroundTruthJob](client/docs/GroundTruthJob.md) + - [GroundTruthSource](client/docs/GroundTruthSource.md) - [ImageBuildingJobState](client/docs/ImageBuildingJobState.md) - [ImageBuildingJobStatus](client/docs/ImageBuildingJobStatus.md) - [Label](client/docs/Label.md) @@ -162,6 +164,7 @@ Class | Method | HTTP request | Description - [ModelEndpointAlertCondition](client/docs/ModelEndpointAlertCondition.md) - [ModelEndpointRule](client/docs/ModelEndpointRule.md) - [ModelEndpointRuleDestination](client/docs/ModelEndpointRuleDestination.md) + - [ModelObservability](client/docs/ModelObservability.md) - [ModelPredictionConfig](client/docs/ModelPredictionConfig.md) - [ModelPredictionOutput](client/docs/ModelPredictionOutput.md) - [ModelPredictionOutputClass](client/docs/ModelPredictionOutputClass.md) @@ -178,6 +181,7 @@ Class | Method | HTTP request | Description - [PredictionJobConfigModel](client/docs/PredictionJobConfigModel.md) - [PredictionJobConfigModelResult](client/docs/PredictionJobConfigModelResult.md) - [PredictionJobResourceRequest](client/docs/PredictionJobResourceRequest.md) + - [PredictionLogIngestionResourceRequest](client/docs/PredictionLogIngestionResourceRequest.md) - [PredictionLoggerConfig](client/docs/PredictionLoggerConfig.md) - [Project](client/docs/Project.md) - [Protocol](client/docs/Protocol.md) diff --git a/python/sdk/merlin/client.py b/python/sdk/merlin/client.py index 252b210ac..e86fd3b1b 100644 --- a/python/sdk/merlin/client.py +++ b/python/sdk/merlin/client.py @@ -45,6 +45,7 @@ from merlin.util import valid_name_check from merlin.version import VERSION from merlin.version_image import VersionImage +from merlin.model_observability import ModelObservability class MerlinClient: @@ -277,6 +278,7 @@ def deploy( autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, enable_model_observability: bool = False, + model_observability: Optional[ModelObservability] = None ) -> VersionEndpoint: return model_version.deploy( environment_name, @@ -289,6 +291,7 @@ def deploy( autoscaling_policy, protocol, enable_model_observability, + model_observability ) def undeploy(self, model_version: ModelVersion, environment_name: str = None): diff --git a/python/sdk/merlin/endpoint.py b/python/sdk/merlin/endpoint.py index 74f7377c3..f70373169 100644 --- a/python/sdk/merlin/endpoint.py +++ b/python/sdk/merlin/endpoint.py @@ -26,6 +26,7 @@ from merlin.util import autostr, get_url from merlin.resource_request import ResourceRequest from merlin.transformer import Transformer, TransformerType +from merlin.model_observability import ModelObservability from merlin.util import extract_optional_value_with_default class Status(Enum): @@ -104,6 +105,9 @@ def __init__(self, endpoint: client.VersionEndpoint, log_url: str = None): self._log_url = log_url self._enable_model_observability = extract_optional_value_with_default(endpoint.enable_model_observability, False) + if endpoint.model_observability is not None: + model_observability = ModelObservability.from_model_observability_response(endpoint.model_observability) + self._model_observability = extract_optional_value_with_default(model_observability, None) @property def url(self): @@ -165,6 +169,10 @@ def transformer(self) -> Optional[Transformer]: @property def enable_model_observability(self) -> bool: return self._enable_model_observability + + @property + def model_observability(self) -> Optional[client.ModelObservability]: + return self._model_observability def _repr_html_(self): return f"""{self._url}""" diff --git a/python/sdk/merlin/fluent.py b/python/sdk/merlin/fluent.py index 48010695f..e71d00c22 100644 --- a/python/sdk/merlin/fluent.py +++ b/python/sdk/merlin/fluent.py @@ -30,6 +30,7 @@ from merlin.resource_request import ResourceRequest from merlin.transformer import Transformer from merlin.version_image import VersionImage +from merlin.model_observability import ModelObservability _merlin_client: Optional[MerlinClient] = None _active_project: Optional[Project] @@ -384,6 +385,7 @@ def deploy( autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, enable_model_observability: bool = False, + model_observability: Optional[ModelObservability] = None, ) -> VersionEndpoint: """ Deploy a model version. @@ -399,6 +401,7 @@ def deploy( :param autoscaling_policy: autoscaling policy to be used for the deployment (default: None) :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) :param enable_model_observability: flag to determine whether model observability enabled for the endpoint + :param model_observability: detail of model observability configuration :return: VersionEndpoint object """ _check_active_client() diff --git a/python/sdk/merlin/model.py b/python/sdk/merlin/model.py index 175cc4e70..fea84c6ab 100644 --- a/python/sdk/merlin/model.py +++ b/python/sdk/merlin/model.py @@ -55,6 +55,7 @@ from merlin.pyfunc import run_pyfunc_local_server from merlin.requirements import process_conda_env from merlin.resource_request import ResourceRequest +from merlin.model_observability import ModelObservability from merlin.transformer import Transformer from merlin.util import ( autostr, @@ -1211,6 +1212,7 @@ def deploy( autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, enable_model_observability: bool = False, + model_observability: Optional[ModelObservability] = None ) -> VersionEndpoint: """ Deploy current model to MLP One of log_model, log_pytorch_model, @@ -1246,6 +1248,7 @@ def deploy( target_env_vars: List[client.EnvVar] = [] target_transformer = None target_logger = None + target_model_observability = None # Get the currently deployed endpoint and if there's no deployed endpoint yet, use the default values for # non-nullable fields @@ -1346,6 +1349,10 @@ def deploy( if logger is not None: target_logger = logger.to_logger_spec() + if model_observability is not None: + print(f"model observability {model_observability}") + target_model_observability = model_observability.to_model_observability_request() + model = self._model endpoint_api = EndpointApi(self._api_client) @@ -1360,6 +1367,7 @@ def deploy( autoscaling_policy=target_autoscaling_policy, protocol=client.Protocol(target_protocol), enable_model_observability=enable_model_observability, + model_observability=target_model_observability, ) if current_endpoint is not None: # This allows a serving deployment to be updated while it is serving diff --git a/python/sdk/merlin/model_observability.py b/python/sdk/merlin/model_observability.py new file mode 100644 index 000000000..b0b5b17ff --- /dev/null +++ b/python/sdk/merlin/model_observability.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import client +from typing import Optional +from merlin.util import autostr +from dataclasses import dataclass +from dataclasses_json import dataclass_json + + +@autostr +@dataclass_json +@dataclass +class GroundTruthSource: + table_urn: str + event_timestamp_column: str + dwh_project: str + +@autostr +@dataclass_json +@dataclass +class GroundTruthJob: + cron_schedule: str + service_account_secret_name: str + start_day_offset_from_now: int + end_day_offset_from_now: int + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + grace_period_day: Optional[int] = None + +@autostr +@dataclass_json +@dataclass +class PredictionLogIngestionResourceRequest: + replica: int + cpu_request: Optional[str] = None + memory_request: Optional[str] = None + +@autostr +@dataclass_json +@dataclass +class ModelObservability: + enabled: bool + ground_truth_source: Optional[GroundTruthSource] = None + ground_truth_job: Optional[GroundTruthJob] = None + prediction_log_ingestion_resource_request: Optional[PredictionLogIngestionResourceRequest] = None + + def to_model_observability_request(self) -> client.ModelObservability: + ground_truth_source = None + if self.ground_truth_source is not None: + ground_truth_source = client.GroundTruthSource( + table_urn=self.ground_truth_source.table_urn, + event_timestamp_column=self.ground_truth_source.event_timestamp_column, + dwh_project=self.ground_truth_source.dwh_project + ) + + ground_truth_job = None + if self.ground_truth_job is not None: + ground_truth_job = client.GroundTruthJob( + cron_schedule=self.ground_truth_job.cron_schedule, + service_account_secret_name=self.ground_truth_job.service_account_secret_name, + start_day_offset_from_now=self.ground_truth_job.start_day_offset_from_now, + end_day_offset_from_now=self.ground_truth_job.end_day_offset_from_now, + cpu_request=self.ground_truth_job.cpu_request, + cpu_limit=self.ground_truth_job.cpu_limit, + memory_request=self.ground_truth_job.memory_request, + memory_limit=self.ground_truth_job.memory_limit, + grace_period_day=self.ground_truth_job.grace_period_day + ) + + prediction_log_ingestion_resource_request = None + if self.prediction_log_ingestion_resource_request is not None: + prediction_log_ingestion_resource_request = client.PredictionLogIngestionResourceRequest( + replica=self.prediction_log_ingestion_resource_request.replica, + cpu_request=self.prediction_log_ingestion_resource_request.cpu_request, + memory_request=self.prediction_log_ingestion_resource_request.memory_request + ) + + return client.ModelObservability( + enabled=self.enabled, + ground_truth_source=ground_truth_source, + ground_truth_job=ground_truth_job, + prediction_log_ingestion_resource_request=prediction_log_ingestion_resource_request + ) + + @classmethod + def from_model_observability_response(cls, response: Optional[client.ModelObservability]) -> Optional[ModelObservability]: + if response is None: + return None + + ground_truth_source = None + if response.ground_truth_source is not None: + ground_truth_source = cls._ground_truth_source_from_response(response.ground_truth_source) + + ground_truth_job = None + if response.ground_truth_job is not None: + ground_truth_job = cls._job_from_response(response.ground_truth_job) + + prediction_log_ingestion_resource_request = None + if response.prediction_log_ingestion_resource_request is not None: + prediction_log_ingestion_resource_request = cls._prediction_log_ingestion_resource_request_from_response(response.prediction_log_ingestion_resource_request) + + return ModelObservability( + enabled=response.enabled, + ground_truth_source=ground_truth_source, + ground_truth_job=ground_truth_job, + prediction_log_ingestion_resource_request=prediction_log_ingestion_resource_request + ) + + @classmethod + def _ground_truth_source_from_response(cls, response: Optional[client.GroundTruthSource]) -> Optional[GroundTruthSource]: + if response is None: + return None + + return GroundTruthSource( + table_urn=response.table_urn, + event_timestamp_column=response.event_timestamp_column, + dwh_project=response.dwh_project + ) + + @classmethod + def _job_from_response(cls, response: Optional[client.GroundTruthJob]) -> Optional[GroundTruthJob]: + if response is None: + return None + + return GroundTruthJob( + cron_schedule=response.cron_schedule, + service_account_secret_name=response.service_account_secret_name, + start_day_offset_from_now=response.start_day_offset_from_now, + end_day_offset_from_now=response.end_day_offset_from_now, + cpu_request=response.cpu_request, + cpu_limit=response.cpu_limit, + memory_request=response.memory_request, + memory_limit=response.memory_limit, + grace_period_day=response.grace_period_day + ) + + @classmethod + def _prediction_log_ingestion_resource_request_from_response(cls, response: Optional[client.PredictionLogIngestionResourceRequest]) -> Optional[PredictionLogIngestionResourceRequest]: + if response is None: + return None + + return PredictionLogIngestionResourceRequest( + replica=response.replica, + cpu_request=response.cpu_request, + memory_request=response.memory_request + ) \ No newline at end of file diff --git a/python/sdk/test/model_observability_test.py b/python/sdk/test/model_observability_test.py new file mode 100644 index 000000000..817717e4f --- /dev/null +++ b/python/sdk/test/model_observability_test.py @@ -0,0 +1,63 @@ +import pytest +import client +from merlin.model_observability import ModelObservability, GroundTruthSource, GroundTruthJob, PredictionLogIngestionResourceRequest + +@pytest.mark.unit +@pytest.mark.parametrize( + "response,expected", [ + ( + client.ModelObservability( + enabled=True, + ground_truth_source=client.GroundTruthSource( + table_urn="table_urn", + event_timestamp_column="event_timestamp_column", + dwh_project="dwh_project" + ), + ground_truth_job=client.GroundTruthJob( + cron_schedule="cron_schedule", + service_account_secret_name="service_account_secret_name", + start_day_offset_from_now=1, + end_day_offset_from_now=1, + cpu_request="cpu_request", + cpu_limit="cpu_limit", + memory_request="memory_request", + memory_limit="memory_limit", + grace_period_day=1 + ), + prediction_log_ingestion_resource_request=client.PredictionLogIngestionResourceRequest( + replica=1, + cpu_request="1", + memory_request="1Gi" + ) + ), + ModelObservability( + enabled=True, + ground_truth_source=GroundTruthSource( + table_urn="table_urn", + event_timestamp_column="event_timestamp_column", + dwh_project="dwh_project" + ), + ground_truth_job=GroundTruthJob( + cron_schedule="cron_schedule", + service_account_secret_name="service_account_secret_name", + start_day_offset_from_now=1, + end_day_offset_from_now=1, + cpu_request="cpu_request", + cpu_limit="cpu_limit", + + memory_request="memory_request", + memory_limit="memory_limit", + grace_period_day=1 + ), + prediction_log_ingestion_resource_request=PredictionLogIngestionResourceRequest( + replica=1, + cpu_request="1", + memory_request="1Gi" + ) + ) + ) + ] +) +def test_from_model_observability_response(response, expected): + assert ModelObservability.from_model_observability_response(response) == expected + assert expected.to_model_observability_request() == response \ No newline at end of file diff --git a/python/sdk/test/model_test.py b/python/sdk/test/model_test.py index e57c4e226..776b50bee 100644 --- a/python/sdk/test/model_test.py +++ b/python/sdk/test/model_test.py @@ -33,6 +33,7 @@ from merlin.model import ModelType from merlin.model_schema import InferenceSchema, ModelSchema, RankingOutput, ValueType from merlin.protocol import Protocol +from merlin.model_observability import ModelObservability from urllib3_mock import Responses responses = Responses("requests.packages.urllib3") @@ -167,6 +168,39 @@ enable_model_observability=True, ) +more_granular_observability_cfg_ep = cl.VersionEndpoint( + id="8000", + version_id=1, + status="running", + url="localhost/1", + service_name="svc-1", + environment_name=env_3.name, + environment=env_3, + monitoring_url="grafana.com", + model_observability=cl.ModelObservability( + enabled=True, + ground_truth_source=cl.GroundTruthSource( + table_urn="table_urn", + event_timestamp_column="event_timestamp_column", + dwh_project="dwh_project", + ), + ground_truth_job=cl.GroundTruthJob( + cron_schedule="cron_schedule", + service_account_secret_name="service_account_secret_name", + start_day_offset_from_now=1, + end_day_offset_from_now=1, + cpu_request="cpu_request", + cpu_limit="cpu_limit", + memory_request="memory_request", + memory_limit="memory_limit", + grace_period_day=1, + ), + prediction_log_ingestion_resource_request=cl.PredictionLogIngestionResourceRequest( + replica=1, cpu_request="1", memory_request="1Gi" + ), + ), +) + rule_1 = cl.ModelEndpointRule( destinations=[ cl.ModelEndpointRuleDestination(version_endpoint_id=ep1.id, weight=100) @@ -838,6 +872,59 @@ def test_deploy_with_model_observability_enabled(self, version): assert endpoint.deployment_mode == DeploymentMode.SERVERLESS assert endpoint.enable_model_observability == True + + @responses.activate + def test_deploy_with_more_granular_model_observability_cfg(self, version): + responses.add( + "GET", + "/v1/environments", + body=json.dumps([env_3.to_dict()]), + status=200, + content_type="application/json", + ) + # This is the additional check which deploy makes to determine if there are any existing endpoints associated + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint", + body=json.dumps([]), + status=200, + content_type="application/json", + ) + responses.add( + "POST", + "/v1/models/1/versions/1/endpoint", + body=json.dumps(more_granular_observability_cfg_ep.to_dict()), + status=201, + content_type="application/json", + ) + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint", + body=json.dumps([more_granular_observability_cfg_ep.to_dict()]), + status=200, + content_type="application/json", + ) + responses.add( + "GET", + "/v1/models/1/versions/1/endpoint/8000", + body=json.dumps(more_granular_observability_cfg_ep.to_dict()), + status=200, + content_type="application/json", + ) + + model_observability = ModelObservability.from_model_observability_response(more_granular_observability_cfg_ep.model_observability) + endpoint = version.deploy( + environment_name=env_3.name, model_observability=model_observability + ) + + assert endpoint.id == more_granular_observability_cfg_ep.id + assert endpoint.status.value == more_granular_observability_cfg_ep.status + assert endpoint.environment_name == more_granular_observability_cfg_ep.environment_name + assert endpoint.environment.cluster == env_3.cluster + assert endpoint.environment.name == env_3.name + assert endpoint.deployment_mode == DeploymentMode.SERVERLESS + assert endpoint.model_observability == model_observability + @responses.activate def test_undeploy(self, version): responses.add( diff --git a/swagger.yaml b/swagger.yaml index d00463b53..7defb9820 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -1952,6 +1952,8 @@ components: "$ref": "#/components/schemas/Protocol" enable_model_observability: type: boolean + model_observability: + "$ref": "#/components/schemas/ModelObservability" created_at: type: string format: date-time @@ -2471,6 +2473,72 @@ components: "$ref": "#/components/schemas/PredictionJob" paging: "$ref": "#/components/schemas/Paging" + GroundTruthSource: + type: object + properties: + table_urn: + type: string + event_timestamp_column: + type: string + dwh_project: + type: string + required: + - table_urn + - event_timestamp_column + - dwh_project + + PredictionLogIngestionResourceRequest: + type: object + properties: + cpu_request: + type: string + memory_request: + type: string + replica: + type: integer + required: + - replica + + GroundTruthJob: + type: object + properties: + cron_schedule: + type: string + cpu_request: + type: string + cpu_limit: + type: string + memory_request: + type: string + memory_limit: + type: string + start_day_offset_from_now: + type: integer + end_day_offset_from_now: + type: integer + grace_period_day: + type: integer + service_account_secret_name: + type: string + required: + - cron_schedule + - service_account_secret_name + - start_day_offset_from_now + - end_day_offset_from_now + ModelObservability: + type: object + properties: + enabled: + type: boolean + ground_truth_source: + "$ref": "#/components/schemas/GroundTruthSource" + ground_truth_job: + "$ref": "#/components/schemas/GroundTruthJob" + prediction_log_ingestion_resource_request: + "$ref": "#/components/schemas/PredictionLogIngestionResourceRequest" + required: + - enabled + securitySchemes: Bearer: type: apiKey