From f8cc0838aac3219cdfc81218db4d71be545f8d7d Mon Sep 17 00:00:00 2001 From: Khor Shu Heng Date: Wed, 21 Feb 2024 16:36:55 +0800 Subject: [PATCH] refactor inference schema --- .../model_binary_classification_output.go | 34 +-- api/client/model_model_prediction_output.go | 72 +++--- api/client/model_ranking_output.go | 36 +-- api/client/model_schema_spec.go | 108 +++++++-- .../publisher/observation_sink.py | 33 +-- .../publisher/prediction_log_consumer.py | 17 +- .../publisher/prediction_log_parser.py | 25 ++- .../tests/test_observation_sink.py | 15 +- .../tests/test_prediction_log_consumer.py | 38 ++-- .../models/binary_classification_output.py | 6 +- python/sdk/client/models/ranking_output.py | 4 +- python/sdk/client/models/schema_spec.py | 8 +- python/sdk/client_README.md | 207 ++++++++++++++++++ python/sdk/merlin/observability/inference.py | 57 +++-- python/sdk/test/observability_test.py | 61 +++--- swagger.yaml | 10 +- 16 files changed, 503 insertions(+), 228 deletions(-) create mode 100644 python/sdk/client_README.md diff --git a/api/client/model_binary_classification_output.go b/api/client/model_binary_classification_output.go index a75023589..a93108ed5 100644 --- a/api/client/model_binary_classification_output.go +++ b/api/client/model_binary_classification_output.go @@ -21,7 +21,7 @@ var _ MappedNullable = &BinaryClassificationOutput{} // BinaryClassificationOutput struct for BinaryClassificationOutput type BinaryClassificationOutput struct { PredictionScoreColumn string `json:"prediction_score_column"` - ActualLabelColumn *string `json:"actual_label_column,omitempty"` + ActualScoreColumn *string `json:"actual_score_column,omitempty"` PositiveClassLabel string `json:"positive_class_label"` NegativeClassLabel string `json:"negative_class_label"` ScoreThreshold *float32 `json:"score_threshold,omitempty"` @@ -75,36 +75,36 @@ func (o *BinaryClassificationOutput) SetPredictionScoreColumn(v string) { o.PredictionScoreColumn = v } -// GetActualLabelColumn returns the ActualLabelColumn field value if set, zero value otherwise. -func (o *BinaryClassificationOutput) GetActualLabelColumn() string { - if o == nil || IsNil(o.ActualLabelColumn) { +// GetActualScoreColumn returns the ActualScoreColumn field value if set, zero value otherwise. +func (o *BinaryClassificationOutput) GetActualScoreColumn() string { + if o == nil || IsNil(o.ActualScoreColumn) { var ret string return ret } - return *o.ActualLabelColumn + return *o.ActualScoreColumn } -// GetActualLabelColumnOk returns a tuple with the ActualLabelColumn field value if set, nil otherwise +// GetActualScoreColumnOk returns a tuple with the ActualScoreColumn field value if set, nil otherwise // and a boolean to check if the value has been set. -func (o *BinaryClassificationOutput) GetActualLabelColumnOk() (*string, bool) { - if o == nil || IsNil(o.ActualLabelColumn) { +func (o *BinaryClassificationOutput) GetActualScoreColumnOk() (*string, bool) { + if o == nil || IsNil(o.ActualScoreColumn) { return nil, false } - return o.ActualLabelColumn, true + return o.ActualScoreColumn, true } -// HasActualLabelColumn returns a boolean if a field has been set. -func (o *BinaryClassificationOutput) HasActualLabelColumn() bool { - if o != nil && !IsNil(o.ActualLabelColumn) { +// HasActualScoreColumn returns a boolean if a field has been set. +func (o *BinaryClassificationOutput) HasActualScoreColumn() bool { + if o != nil && !IsNil(o.ActualScoreColumn) { return true } return false } -// SetActualLabelColumn gets a reference to the given string and assigns it to the ActualLabelColumn field. -func (o *BinaryClassificationOutput) SetActualLabelColumn(v string) { - o.ActualLabelColumn = &v +// SetActualScoreColumn gets a reference to the given string and assigns it to the ActualScoreColumn field. +func (o *BinaryClassificationOutput) SetActualScoreColumn(v string) { + o.ActualScoreColumn = &v } // GetPositiveClassLabel returns the PositiveClassLabel field value @@ -222,8 +222,8 @@ func (o BinaryClassificationOutput) MarshalJSON() ([]byte, error) { func (o BinaryClassificationOutput) ToMap() (map[string]interface{}, error) { toSerialize := map[string]interface{}{} toSerialize["prediction_score_column"] = o.PredictionScoreColumn - if !IsNil(o.ActualLabelColumn) { - toSerialize["actual_label_column"] = o.ActualLabelColumn + if !IsNil(o.ActualScoreColumn) { + toSerialize["actual_score_column"] = o.ActualScoreColumn } toSerialize["positive_class_label"] = o.PositiveClassLabel toSerialize["negative_class_label"] = o.NegativeClassLabel diff --git a/api/client/model_model_prediction_output.go b/api/client/model_model_prediction_output.go index 3e0d72e8b..c5313594f 100644 --- a/api/client/model_model_prediction_output.go +++ b/api/client/model_model_prediction_output.go @@ -46,58 +46,50 @@ func RegressionOutputAsModelPredictionOutput(v *RegressionOutput) ModelPredictio // Unmarshal JSON data into one of the pointers in the struct func (dst *ModelPredictionOutput) UnmarshalJSON(data []byte) error { var err error - match := 0 - // try to unmarshal data into BinaryClassificationOutput - err = newStrictDecoder(data).Decode(&dst.BinaryClassificationOutput) - if err == nil { - jsonBinaryClassificationOutput, _ := json.Marshal(dst.BinaryClassificationOutput) - if string(jsonBinaryClassificationOutput) == "{}" { // empty struct - dst.BinaryClassificationOutput = nil + // use discriminator value to speed up the lookup + var jsonDict map[string]interface{} + err = newStrictDecoder(data).Decode(&jsonDict) + if err != nil { + return fmt.Errorf("failed to unmarshal JSON into map for the discriminator lookup") + } + + // check if the discriminator value is 'BinaryClassificationOutput' + if jsonDict["output_class"] == "BinaryClassificationOutput" { + // try to unmarshal JSON data into BinaryClassificationOutput + err = json.Unmarshal(data, &dst.BinaryClassificationOutput) + if err == nil { + return nil // data stored in dst.BinaryClassificationOutput, return on the first match } else { - match++ + dst.BinaryClassificationOutput = nil + return fmt.Errorf("failed to unmarshal ModelPredictionOutput as BinaryClassificationOutput: %s", err.Error()) } - } else { - dst.BinaryClassificationOutput = nil } - // try to unmarshal data into RankingOutput - err = newStrictDecoder(data).Decode(&dst.RankingOutput) - if err == nil { - jsonRankingOutput, _ := json.Marshal(dst.RankingOutput) - if string(jsonRankingOutput) == "{}" { // empty struct - dst.RankingOutput = nil + // check if the discriminator value is 'RankingOutput' + if jsonDict["output_class"] == "RankingOutput" { + // try to unmarshal JSON data into RankingOutput + err = json.Unmarshal(data, &dst.RankingOutput) + if err == nil { + return nil // data stored in dst.RankingOutput, return on the first match } else { - match++ + dst.RankingOutput = nil + return fmt.Errorf("failed to unmarshal ModelPredictionOutput as RankingOutput: %s", err.Error()) } - } else { - dst.RankingOutput = nil } - // try to unmarshal data into RegressionOutput - err = newStrictDecoder(data).Decode(&dst.RegressionOutput) - if err == nil { - jsonRegressionOutput, _ := json.Marshal(dst.RegressionOutput) - if string(jsonRegressionOutput) == "{}" { // empty struct - dst.RegressionOutput = nil + // check if the discriminator value is 'RegressionOutput' + if jsonDict["output_class"] == "RegressionOutput" { + // try to unmarshal JSON data into RegressionOutput + err = json.Unmarshal(data, &dst.RegressionOutput) + if err == nil { + return nil // data stored in dst.RegressionOutput, return on the first match } else { - match++ + dst.RegressionOutput = nil + return fmt.Errorf("failed to unmarshal ModelPredictionOutput as RegressionOutput: %s", err.Error()) } - } else { - dst.RegressionOutput = nil } - if match > 1 { // more than 1 match - // reset to nil - dst.BinaryClassificationOutput = nil - dst.RankingOutput = nil - dst.RegressionOutput = nil - - return fmt.Errorf("data matches more than one schema in oneOf(ModelPredictionOutput)") - } else if match == 1 { - return nil // exactly one match - } else { // no match - return fmt.Errorf("data failed to match schemas in oneOf(ModelPredictionOutput)") - } + return nil } // Marshal data from the first non-nil pointers in the struct to JSON diff --git a/api/client/model_ranking_output.go b/api/client/model_ranking_output.go index 93b947066..a339867c8 100644 --- a/api/client/model_ranking_output.go +++ b/api/client/model_ranking_output.go @@ -20,10 +20,9 @@ var _ MappedNullable = &RankingOutput{} // RankingOutput struct for RankingOutput type RankingOutput struct { - RankScoreColumn string `json:"rank_score_column"` - PredictionGroupIdColumn string `json:"prediction_group_id_column"` - RelevanceScoreColumn *string `json:"relevance_score_column,omitempty"` - OutputClass ModelPredictionOutputClass `json:"output_class"` + RankScoreColumn string `json:"rank_score_column"` + RelevanceScoreColumn *string `json:"relevance_score_column,omitempty"` + OutputClass ModelPredictionOutputClass `json:"output_class"` } type _RankingOutput RankingOutput @@ -32,10 +31,9 @@ type _RankingOutput RankingOutput // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewRankingOutput(rankScoreColumn string, predictionGroupIdColumn string, outputClass ModelPredictionOutputClass) *RankingOutput { +func NewRankingOutput(rankScoreColumn string, outputClass ModelPredictionOutputClass) *RankingOutput { this := RankingOutput{} this.RankScoreColumn = rankScoreColumn - this.PredictionGroupIdColumn = predictionGroupIdColumn this.OutputClass = outputClass return &this } @@ -72,30 +70,6 @@ func (o *RankingOutput) SetRankScoreColumn(v string) { o.RankScoreColumn = v } -// GetPredictionGroupIdColumn returns the PredictionGroupIdColumn field value -func (o *RankingOutput) GetPredictionGroupIdColumn() string { - if o == nil { - var ret string - return ret - } - - return o.PredictionGroupIdColumn -} - -// GetPredictionGroupIdColumnOk returns a tuple with the PredictionGroupIdColumn field value -// and a boolean to check if the value has been set. -func (o *RankingOutput) GetPredictionGroupIdColumnOk() (*string, bool) { - if o == nil { - return nil, false - } - return &o.PredictionGroupIdColumn, true -} - -// SetPredictionGroupIdColumn sets field value -func (o *RankingOutput) SetPredictionGroupIdColumn(v string) { - o.PredictionGroupIdColumn = v -} - // GetRelevanceScoreColumn returns the RelevanceScoreColumn field value if set, zero value otherwise. func (o *RankingOutput) GetRelevanceScoreColumn() string { if o == nil || IsNil(o.RelevanceScoreColumn) { @@ -163,7 +137,6 @@ func (o RankingOutput) MarshalJSON() ([]byte, error) { func (o RankingOutput) ToMap() (map[string]interface{}, error) { toSerialize := map[string]interface{}{} toSerialize["rank_score_column"] = o.RankScoreColumn - toSerialize["prediction_group_id_column"] = o.PredictionGroupIdColumn if !IsNil(o.RelevanceScoreColumn) { toSerialize["relevance_score_column"] = o.RelevanceScoreColumn } @@ -177,7 +150,6 @@ func (o *RankingOutput) UnmarshalJSON(bytes []byte) (err error) { // that every required field exists as a key in the generic map. requiredProperties := []string{ "rank_score_column", - "prediction_group_id_column", "output_class", } diff --git a/api/client/model_schema_spec.go b/api/client/model_schema_spec.go index ec474cb10..9616b1e84 100644 --- a/api/client/model_schema_spec.go +++ b/api/client/model_schema_spec.go @@ -20,7 +20,9 @@ var _ MappedNullable = &SchemaSpec{} // SchemaSpec struct for SchemaSpec type SchemaSpec struct { - PredictionIdColumn string `json:"prediction_id_column"` + PredictionIdColumn *string `json:"prediction_id_column,omitempty"` + SessionIdColumn *string `json:"session_id_column,omitempty"` + RowIdColumn *string `json:"row_id_column,omitempty"` ModelPredictionOutput ModelPredictionOutput `json:"model_prediction_output"` TagColumns []string `json:"tag_columns,omitempty"` FeatureTypes map[string]ValueType `json:"feature_types"` @@ -32,9 +34,8 @@ type _SchemaSpec SchemaSpec // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewSchemaSpec(predictionIdColumn string, modelPredictionOutput ModelPredictionOutput, featureTypes map[string]ValueType) *SchemaSpec { +func NewSchemaSpec(modelPredictionOutput ModelPredictionOutput, featureTypes map[string]ValueType) *SchemaSpec { this := SchemaSpec{} - this.PredictionIdColumn = predictionIdColumn this.ModelPredictionOutput = modelPredictionOutput this.FeatureTypes = featureTypes return &this @@ -48,28 +49,100 @@ func NewSchemaSpecWithDefaults() *SchemaSpec { return &this } -// GetPredictionIdColumn returns the PredictionIdColumn field value +// GetPredictionIdColumn returns the PredictionIdColumn field value if set, zero value otherwise. func (o *SchemaSpec) GetPredictionIdColumn() string { - if o == nil { + if o == nil || IsNil(o.PredictionIdColumn) { var ret string return ret } - - return o.PredictionIdColumn + return *o.PredictionIdColumn } -// GetPredictionIdColumnOk returns a tuple with the PredictionIdColumn field value +// GetPredictionIdColumnOk returns a tuple with the PredictionIdColumn field value if set, nil otherwise // and a boolean to check if the value has been set. func (o *SchemaSpec) GetPredictionIdColumnOk() (*string, bool) { - if o == nil { + if o == nil || IsNil(o.PredictionIdColumn) { return nil, false } - return &o.PredictionIdColumn, true + return o.PredictionIdColumn, true +} + +// HasPredictionIdColumn returns a boolean if a field has been set. +func (o *SchemaSpec) HasPredictionIdColumn() bool { + if o != nil && !IsNil(o.PredictionIdColumn) { + return true + } + + return false } -// SetPredictionIdColumn sets field value +// SetPredictionIdColumn gets a reference to the given string and assigns it to the PredictionIdColumn field. func (o *SchemaSpec) SetPredictionIdColumn(v string) { - o.PredictionIdColumn = v + o.PredictionIdColumn = &v +} + +// GetSessionIdColumn returns the SessionIdColumn field value if set, zero value otherwise. +func (o *SchemaSpec) GetSessionIdColumn() string { + if o == nil || IsNil(o.SessionIdColumn) { + var ret string + return ret + } + return *o.SessionIdColumn +} + +// GetSessionIdColumnOk returns a tuple with the SessionIdColumn field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *SchemaSpec) GetSessionIdColumnOk() (*string, bool) { + if o == nil || IsNil(o.SessionIdColumn) { + return nil, false + } + return o.SessionIdColumn, true +} + +// HasSessionIdColumn returns a boolean if a field has been set. +func (o *SchemaSpec) HasSessionIdColumn() bool { + if o != nil && !IsNil(o.SessionIdColumn) { + return true + } + + return false +} + +// SetSessionIdColumn gets a reference to the given string and assigns it to the SessionIdColumn field. +func (o *SchemaSpec) SetSessionIdColumn(v string) { + o.SessionIdColumn = &v +} + +// GetRowIdColumn returns the RowIdColumn field value if set, zero value otherwise. +func (o *SchemaSpec) GetRowIdColumn() string { + if o == nil || IsNil(o.RowIdColumn) { + var ret string + return ret + } + return *o.RowIdColumn +} + +// GetRowIdColumnOk returns a tuple with the RowIdColumn field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *SchemaSpec) GetRowIdColumnOk() (*string, bool) { + if o == nil || IsNil(o.RowIdColumn) { + return nil, false + } + return o.RowIdColumn, true +} + +// HasRowIdColumn returns a boolean if a field has been set. +func (o *SchemaSpec) HasRowIdColumn() bool { + if o != nil && !IsNil(o.RowIdColumn) { + return true + } + + return false +} + +// SetRowIdColumn gets a reference to the given string and assigns it to the RowIdColumn field. +func (o *SchemaSpec) SetRowIdColumn(v string) { + o.RowIdColumn = &v } // GetModelPredictionOutput returns the ModelPredictionOutput field value @@ -162,7 +235,15 @@ func (o SchemaSpec) MarshalJSON() ([]byte, error) { func (o SchemaSpec) ToMap() (map[string]interface{}, error) { toSerialize := map[string]interface{}{} - toSerialize["prediction_id_column"] = o.PredictionIdColumn + if !IsNil(o.PredictionIdColumn) { + toSerialize["prediction_id_column"] = o.PredictionIdColumn + } + if !IsNil(o.SessionIdColumn) { + toSerialize["session_id_column"] = o.SessionIdColumn + } + if !IsNil(o.RowIdColumn) { + toSerialize["row_id_column"] = o.RowIdColumn + } toSerialize["model_prediction_output"] = o.ModelPredictionOutput if !IsNil(o.TagColumns) { toSerialize["tag_columns"] = o.TagColumns @@ -176,7 +257,6 @@ func (o *SchemaSpec) UnmarshalJSON(bytes []byte) (err error) { // by unmarshalling the object into a generic map with string keys and checking // that every required field exists as a key in the generic map. requiredProperties := []string{ - "prediction_id_column", "model_prediction_output", "feature_types", } diff --git a/python/observation-publisher/publisher/observation_sink.py b/python/observation-publisher/publisher/observation_sink.py index a09ddbb6c..b315e815b 100644 --- a/python/observation-publisher/publisher/observation_sink.py +++ b/python/observation-publisher/publisher/observation_sink.py @@ -15,14 +15,13 @@ from google.cloud.bigquery import (SchemaField, Table, TimePartitioning, TimePartitioningType) from merlin.observability.inference import (BinaryClassificationOutput, - InferenceSchema, ObservationType, + InferenceSchema, RankingOutput, RegressionOutput, ValueType) from publisher.config import ObservationSinkConfig, ObservationSinkType -from publisher.prediction_log_parser import (MODEL_VERSION_COLUMN, - PREDICTION_LOG_TIMESTAMP_COLUMN, - ROW_ID_COLUMN, SESSION_ID_COLUMN) +from publisher.prediction_log_parser import (PREDICTION_LOG_MODEL_VERSION_COLUMN, + PREDICTION_LOG_TIMESTAMP_COLUMN) class ObservationSink(abc.ABC): @@ -83,7 +82,6 @@ def _common_arize_schema_attributes(self) -> dict: feature_column_names=self._inference_schema.feature_columns, prediction_id_column_name=self._inference_schema.prediction_id_column, timestamp_column_name=PREDICTION_LOG_TIMESTAMP_COLUMN, - tag_column_names=self._inference_schema.tag_columns, ) def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]: @@ -102,7 +100,7 @@ def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]: elif isinstance(prediction_output, RankingOutput): schema_attributes = self._common_arize_schema_attributes() | dict( rank_column_name=prediction_output.rank_column, - prediction_group_id_column_name=SESSION_ID_COLUMN, + prediction_group_id_column_name=self._inference_schema.session_id_column, ) model_type = ArizeModelType.RANKING else: @@ -113,21 +111,10 @@ def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]: return model_type, ArizeSchema(**schema_attributes) def write(self, df: pd.DataFrame): - df[self._inference_schema.prediction_id_column] = ( - df[SESSION_ID_COLUMN] + df[ROW_ID_COLUMN] - ) - if isinstance(self._inference_schema.model_prediction_output, RankingOutput): - df[ - self._inference_schema.model_prediction_output.prediction_group_id_column - ] = df[SESSION_ID_COLUMN] - - processed_df = self._inference_schema.model_prediction_output.preprocess( - df, [ObservationType.FEATURE, ObservationType.PREDICTION] - ) model_type, arize_schema = self._to_arize_schema() try: self._client.log( - dataframe=processed_df, + dataframe=df, environment=Environments.PRODUCTION, schema=arize_schema, model_id=self._model_id, @@ -248,11 +235,15 @@ def schema_fields(self) -> List[SchemaField]: schema_fields = [ SchemaField( - name=SESSION_ID_COLUMN, + name=self._inference_schema.session_id_column, + field_type="STRING", + ), + SchemaField( + name=self._inference_schema.row_id_column, field_type="STRING", ), SchemaField( - name=ROW_ID_COLUMN, + name=self._inference_schema.prediction_id_column, field_type="STRING", ), SchemaField( @@ -260,7 +251,7 @@ def schema_fields(self) -> List[SchemaField]: field_type="TIMESTAMP", ), SchemaField( - name=MODEL_VERSION_COLUMN, + name=PREDICTION_LOG_MODEL_VERSION_COLUMN, field_type="STRING", ), ] diff --git a/python/observation-publisher/publisher/prediction_log_consumer.py b/python/observation-publisher/publisher/prediction_log_consumer.py index eef099c53..a29d639a8 100644 --- a/python/observation-publisher/publisher/prediction_log_consumer.py +++ b/python/observation-publisher/publisher/prediction_log_consumer.py @@ -9,14 +9,13 @@ from caraml.upi.v1.prediction_log_pb2 import PredictionLog from confluent_kafka import Consumer, KafkaException from dataclasses_json import DataClassJsonMixin, dataclass_json -from merlin.observability.inference import InferenceSchema +from merlin.observability.inference import InferenceSchema, ObservationType from publisher.config import ObservationSource, ObservationSourceConfig from publisher.metric import MetricWriter from publisher.observation_sink import ObservationSink -from publisher.prediction_log_parser import (MODEL_VERSION_COLUMN, +from publisher.prediction_log_parser import (PREDICTION_LOG_MODEL_VERSION_COLUMN, PREDICTION_LOG_TIMESTAMP_COLUMN, - ROW_ID_COLUMN, SESSION_ID_COLUMN, PredictionLogFeatureTable, PredictionLogResultsTable) @@ -46,7 +45,6 @@ def start_polling( ): try: buffered_logs = [] - buffered_max_duration_seconds = 60 buffer_start_time = datetime.now() while True: logs = self.poll_new_logs() @@ -56,7 +54,7 @@ def start_polling( buffered_duration = (datetime.now() - buffer_start_time).seconds if ( len(buffered_logs) < self.buffer_capacity - and buffered_duration < buffered_max_duration_seconds + and buffered_duration < self.buffer_max_duration_seconds ): continue df = log_batch_to_dataframe( @@ -188,10 +186,10 @@ def log_to_records( feature_table.columns + prediction_results_table.columns + [ - SESSION_ID_COLUMN, - ROW_ID_COLUMN, + inference_schema.session_id_column, + inference_schema.row_id_column, PREDICTION_LOG_TIMESTAMP_COLUMN, - MODEL_VERSION_COLUMN, + PREDICTION_LOG_MODEL_VERSION_COLUMN, ] ) @@ -206,4 +204,5 @@ def log_batch_to_dataframe( for log in logs: rows, column_names = log_to_records(log, inference_schema, model_version) combined_records.extend(rows) - return pd.DataFrame.from_records(combined_records, columns=column_names) + df = pd.DataFrame.from_records(combined_records, columns=column_names) + return inference_schema.preprocess(df, [ObservationType.PREDICTION]) diff --git a/python/observation-publisher/publisher/prediction_log_parser.py b/python/observation-publisher/publisher/prediction_log_parser.py index 97ecd4d09..4541d7b07 100644 --- a/python/observation-publisher/publisher/prediction_log_parser.py +++ b/python/observation-publisher/publisher/prediction_log_parser.py @@ -3,14 +3,12 @@ from typing import Dict, List, Optional, Union import numpy as np -from google.protobuf.internal.well_known_types import ListValue, Struct +from google.protobuf.struct_pb2 import ListValue, Struct from merlin.observability.inference import InferenceSchema, ValueType from typing_extensions import Self -SESSION_ID_COLUMN = "session_id" -ROW_ID_COLUMN = "row_id" PREDICTION_LOG_TIMESTAMP_COLUMN = "request_timestamp" -MODEL_VERSION_COLUMN = "model_version" +PREDICTION_LOG_MODEL_VERSION_COLUMN = "model_version" @dataclass @@ -58,8 +56,16 @@ def from_struct( def convert_to_numpy_value( - col_value: Optional[int | str | float | bool], value_type: ValueType + col_value: Optional[int | str | float | bool], value_type: Optional[ValueType] ) -> np.int64 | np.float64 | np.bool_ | np.str_: + if value_type is None: + if isinstance(col_value, (int, float)): + return np.float64(col_value) + if isinstance(col_value, str): + return np.str_(col_value) + else: + raise ValueError(f"Unable to infer numpy type for type: {type(col_value)}") + match value_type: case ValueType.INT64: assert isinstance(col_value, (int, float)) @@ -79,7 +85,7 @@ def convert_to_numpy_value( def list_value_as_string_list(list_value: ListValue) -> List[str]: string_list: List[str] = [] - for v in list_value: + for v in list_value.items(): assert isinstance(v, str) string_list.append(v) return string_list @@ -87,9 +93,10 @@ def list_value_as_string_list(list_value: ListValue) -> List[str]: def list_value_as_rows(list_value: ListValue) -> List[ListValue]: rows: List[ListValue] = [] - for d in list_value: + for d in list_value.items(): assert isinstance(d, ListValue) rows.append(d) + return rows @@ -97,11 +104,11 @@ def list_value_as_numpy_list( list_value: ListValue, column_names: List[str], column_types: Dict[str, ValueType] ) -> List[np.int64 | np.float64 | np.bool_ | np.str_]: column_values: List[int | str | float | bool | None] = [] - for v in list_value: + for v in list_value.items(): assert isinstance(v, (int, str, float, bool, NoneType)) column_values.append(v) return [ - convert_to_numpy_value(col_value, column_types[col_name]) + convert_to_numpy_value(col_value, column_types.get(col_name)) for col_value, col_name in zip(column_values, column_names) ] diff --git a/python/observation-publisher/tests/test_observation_sink.py b/python/observation-publisher/tests/test_observation_sink.py index f12306a8e..58b5df3c5 100644 --- a/python/observation-publisher/tests/test_observation_sink.py +++ b/python/observation-publisher/tests/test_observation_sink.py @@ -62,16 +62,17 @@ def ranking_inference_logs() -> pd.DataFrame: request_timestamp = datetime(2024, 1, 1, 0, 0, 0).astimezone(tz.UTC) return pd.DataFrame.from_records( [ - [5.0, 1.0, "1234", "1001", request_timestamp], - [4.0, 0.9, "1234", "1002", request_timestamp], - [3.0, 0.8, "1234", "1003", request_timestamp], + [5.0, 1.0, "1234", "1001", request_timestamp, 1], + [4.0, 0.9, "1234", "1002", request_timestamp, 1], + [3.0, 0.8, "1234", "1003", request_timestamp, 1], ], columns=[ "rating", "rank_score", - "session_id", - "row_id", + "order_id", + "driver_id", "request_timestamp", + "_rank" ], ) @@ -112,7 +113,6 @@ def test_binary_classification_model_preprocessing_for_arize( def test_ranking_model_preprocessing_for_arize( - binary_classification_inference_logs: pd.DataFrame, ranking_inference_logs: pd.DataFrame, ): inference_schema = InferenceSchema( @@ -121,9 +121,10 @@ def test_ranking_model_preprocessing_for_arize( }, model_prediction_output=RankingOutput( rank_score_column="rank_score", - prediction_group_id_column="order_id", relevance_score_column="relevance_score_column", ), + session_id_column="order_id", + row_id_column="driver_id", ) arize_client = MockArizeClient(api_key="test", space_key="test") arize_sink = ArizeSink( diff --git a/python/observation-publisher/tests/test_prediction_log_consumer.py b/python/observation-publisher/tests/test_prediction_log_consumer.py index c41f9b60d..6c2920557 100644 --- a/python/observation-publisher/tests/test_prediction_log_consumer.py +++ b/python/observation-publisher/tests/test_prediction_log_consumer.py @@ -14,7 +14,7 @@ def new_prediction_log( model_id: str, model_version: str, - prediction_id: str, + session_id: str, row_ids: List[str], input_columns: List[str], input_data: List[List[Any]], @@ -30,7 +30,7 @@ def new_prediction_log( raise ValueError("input columns and input data must have the same length") prediction_log = PredictionLog() - prediction_log.prediction_id = prediction_id + prediction_log.prediction_id = session_id prediction_log.model_name = model_id prediction_log.model_version = model_version prediction_log.input.features_table.update( @@ -62,11 +62,13 @@ def test_log_to_dataframe(): }, model_prediction_output=BinaryClassificationOutput( prediction_score_column="prediction_score", - actual_label_column="actual_label", + actual_score_column="actual_score", positive_class_label="fraud", negative_class_label="non fraud", score_threshold=0.5, ), + session_id_column="order_id", + row_id_column="driver_id" ) input_columns = [ "acceptance_rate", @@ -77,7 +79,7 @@ def test_log_to_dataframe(): request_timestamp = datetime(2021, 1, 1, 0, 0, 0) prediction_logs = [ new_prediction_log( - prediction_id="1234", + session_id="1234", model_id=model_id, model_version=model_version, input_columns=input_columns, @@ -94,7 +96,7 @@ def test_log_to_dataframe(): row_ids=["a", "b"], ), new_prediction_log( - prediction_id="5678", + session_id="5678", model_id=model_id, model_version=model_version, input_columns=input_columns, @@ -116,23 +118,25 @@ def test_log_to_dataframe(): ) expected_df = pd.DataFrame.from_records( [ - [0.8, 24, "FOOD", 0.9, "1234", "a", request_timestamp, model_version], - [0.5, 2, "RIDE", 0.5, "1234", "b", request_timestamp, model_version], - [1.0, 13, "CAR", 0.4, "5678", "c", request_timestamp, model_version], - [0.4, 60, "RIDE", 0.2, "5678", "d", request_timestamp, model_version], + [0.8, 24, "FOOD", 0.9, "fraud", "1234", "a", "1234_a", request_timestamp, model_version], + [0.5, 2, "RIDE", 0.5, "fraud", "1234", "b", "1234_b", request_timestamp, model_version], + [1.0, 13, "CAR", 0.4, "non fraud", "5678", "c", "5678_c", request_timestamp, model_version], + [0.4, 60, "RIDE", 0.2, "non fraud", "5678", "d", "5678_d", request_timestamp, model_version], ], columns=[ "acceptance_rate", "minutes_since_last_order", "service_type", "prediction_score", - "session_id", - "row_id", + "_prediction_label", + "order_id", + "driver_id", + "prediction_id", "request_timestamp", "model_version", ], ) - assert_frame_equal(prediction_logs_df, expected_df) + assert_frame_equal(prediction_logs_df, expected_df, check_like=True) def test_empty_column_conversion_to_dataframe(): @@ -144,7 +148,7 @@ def test_empty_column_conversion_to_dataframe(): }, model_prediction_output=BinaryClassificationOutput( prediction_score_column="prediction_score", - actual_label_column="actual_label", + actual_score_column="actual_score", positive_class_label="fraud", negative_class_label="non fraud", score_threshold=0.5, @@ -152,7 +156,7 @@ def test_empty_column_conversion_to_dataframe(): ) prediction_logs = [ new_prediction_log( - prediction_id="1234", + session_id="1234", model_id=model_id, model_version=model_version, input_columns=["acceptance_rate"], @@ -175,8 +179,10 @@ def test_empty_column_conversion_to_dataframe(): [ np.NaN, 0.5, + "fraud", "1234", "a", + "1234_a", datetime(2021, 1, 1, 0, 0, 0), "0.1.0", ], @@ -184,10 +190,12 @@ def test_empty_column_conversion_to_dataframe(): columns=[ "acceptance_rate", "prediction_score", + "_prediction_label", "session_id", "row_id", + "prediction_id", "request_timestamp", "model_version", ], ) - assert_frame_equal(prediction_logs_df, expected_df) + assert_frame_equal(prediction_logs_df, expected_df, check_like=True) diff --git a/python/sdk/client/models/binary_classification_output.py b/python/sdk/client/models/binary_classification_output.py index b99ebf437..253c9df07 100644 --- a/python/sdk/client/models/binary_classification_output.py +++ b/python/sdk/client/models/binary_classification_output.py @@ -31,12 +31,12 @@ class BinaryClassificationOutput(BaseModel): BinaryClassificationOutput """ # noqa: E501 prediction_score_column: StrictStr - actual_label_column: Optional[StrictStr] = None + actual_score_column: Optional[StrictStr] = None positive_class_label: StrictStr negative_class_label: StrictStr score_threshold: Optional[Union[StrictFloat, StrictInt]] = None output_class: ModelPredictionOutputClass - __properties: ClassVar[List[str]] = ["prediction_score_column", "actual_label_column", "positive_class_label", "negative_class_label", "score_threshold", "output_class"] + __properties: ClassVar[List[str]] = ["prediction_score_column", "actual_score_column", "positive_class_label", "negative_class_label", "score_threshold", "output_class"] model_config = { "populate_by_name": True, @@ -87,7 +87,7 @@ def from_dict(cls, obj: Dict) -> Self: _obj = cls.model_validate({ "prediction_score_column": obj.get("prediction_score_column"), - "actual_label_column": obj.get("actual_label_column"), + "actual_score_column": obj.get("actual_score_column"), "positive_class_label": obj.get("positive_class_label"), "negative_class_label": obj.get("negative_class_label"), "score_threshold": obj.get("score_threshold"), diff --git a/python/sdk/client/models/ranking_output.py b/python/sdk/client/models/ranking_output.py index 31f673a75..4e9cf30a4 100644 --- a/python/sdk/client/models/ranking_output.py +++ b/python/sdk/client/models/ranking_output.py @@ -31,10 +31,9 @@ class RankingOutput(BaseModel): RankingOutput """ # noqa: E501 rank_score_column: StrictStr - prediction_group_id_column: StrictStr relevance_score_column: Optional[StrictStr] = None output_class: ModelPredictionOutputClass - __properties: ClassVar[List[str]] = ["rank_score_column", "prediction_group_id_column", "relevance_score_column", "output_class"] + __properties: ClassVar[List[str]] = ["rank_score_column", "relevance_score_column", "output_class"] model_config = { "populate_by_name": True, @@ -85,7 +84,6 @@ def from_dict(cls, obj: Dict) -> Self: _obj = cls.model_validate({ "rank_score_column": obj.get("rank_score_column"), - "prediction_group_id_column": obj.get("prediction_group_id_column"), "relevance_score_column": obj.get("relevance_score_column"), "output_class": obj.get("output_class") }) diff --git a/python/sdk/client/models/schema_spec.py b/python/sdk/client/models/schema_spec.py index 26e02f4e2..a50dff7b3 100644 --- a/python/sdk/client/models/schema_spec.py +++ b/python/sdk/client/models/schema_spec.py @@ -31,11 +31,13 @@ class SchemaSpec(BaseModel): """ SchemaSpec """ # noqa: E501 - prediction_id_column: StrictStr + prediction_id_column: Optional[StrictStr] = None + session_id_column: Optional[StrictStr] = None + row_id_column: Optional[StrictStr] = None model_prediction_output: ModelPredictionOutput tag_columns: Optional[List[StrictStr]] = None feature_types: Dict[str, ValueType] - __properties: ClassVar[List[str]] = ["prediction_id_column", "model_prediction_output", "tag_columns", "feature_types"] + __properties: ClassVar[List[str]] = ["prediction_id_column", "session_id_column", "row_id_column", "model_prediction_output", "tag_columns", "feature_types"] model_config = { "populate_by_name": True, @@ -89,6 +91,8 @@ def from_dict(cls, obj: Dict) -> Self: _obj = cls.model_validate({ "prediction_id_column": obj.get("prediction_id_column"), + "session_id_column": obj.get("session_id_column"), + "row_id_column": obj.get("row_id_column"), "model_prediction_output": ModelPredictionOutput.from_dict(obj.get("model_prediction_output")) if obj.get("model_prediction_output") is not None else None, "tag_columns": obj.get("tag_columns"), "feature_types": dict((_k, _v) for _k, _v in obj.get("feature_types").items()) diff --git a/python/sdk/client_README.md b/python/sdk/client_README.md new file mode 100644 index 000000000..069653994 --- /dev/null +++ b/python/sdk/client_README.md @@ -0,0 +1,207 @@ +# merlin-sdk +API Guide for accessing Merlin's model management, deployment, and serving functionalities + +The `client` package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project: + +- API version: 0.14.0 +- Package version: 1.0.0 +- Build package: org.openapitools.codegen.languages.PythonClientCodegen + +## Requirements. + +Python 3.7+ + +## Installation & Usage + +This python library package is generated without supporting files like setup.py or requirements files + +To be able to use it, you will need these dependencies in your own package that uses this library: + +* urllib3 >= 1.25.3 +* python-dateutil +* pydantic + +## Getting Started + +In your own code, to use this library to connect and interact with merlin-sdk, +you can run the following: + +```python + +import time +import client +from client.rest import ApiException +from pprint import pprint + +# Defining the host is optional and defaults to http://localhost:8080/v1 +# See configuration.py for a list of all supported configuration parameters. +configuration = client.Configuration( + host = "http://localhost:8080/v1" +) + +# The client must configure the authentication and authorization parameters +# in accordance with the API server security policy. +# Examples for each auth method are provided below, use the example that +# satisfies your auth use case. + +# Configure API key authorization: Bearer +configuration.api_key['Bearer'] = os.environ["API_KEY"] + +# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed +# configuration.api_key_prefix['Bearer'] = 'Bearer' + + +# Enter a context with an instance of the API client +with client.ApiClient(configuration) as api_client: + # Create an instance of the API class + api_instance = client.AlertApi(api_client) + + try: + # Lists teams for alert notification channel. + api_response = api_instance.alerts_teams_get() + print("The response of AlertApi->alerts_teams_get:\n") + pprint(api_response) + except ApiException as e: + print("Exception when calling AlertApi->alerts_teams_get: %s\n" % e) + +``` + +## Documentation for API Endpoints + +All URIs are relative to *http://localhost:8080/v1* + +Class | Method | HTTP request | Description +------------ | ------------- | ------------- | ------------- +*AlertApi* | [**alerts_teams_get**](client/docs/AlertApi.md#alerts_teams_get) | **GET** /alerts/teams | Lists teams for alert notification channel. +*AlertApi* | [**models_model_id_alerts_get**](client/docs/AlertApi.md#models_model_id_alerts_get) | **GET** /models/{model_id}/alerts | Lists alerts for given model. +*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_get**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Gets alert for given model endpoint. +*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_post**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_post) | **POST** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint. +*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_put**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint. +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_containers_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_containers_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id}/containers | Get all container belong to a version endpoint +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_delete**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_delete) | **DELETE** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Undeploy the specified model version deployment +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Get version endpoint resource +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_put**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_put) | **PUT** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Modify version endpoint, this API will redeploy the associated deployment +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint | List all endpoint of a model version +*EndpointApi* | [**models_model_id_versions_version_id_endpoint_post**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_post) | **POST** /models/{model_id}/versions/{version_id}/endpoint | Deploy specific version of the models +*EnvironmentApi* | [**environments_get**](client/docs/EnvironmentApi.md#environments_get) | **GET** /environments | List available environment +*LogApi* | [**logs_get**](client/docs/LogApi.md#logs_get) | **GET** /logs | Retrieve log from a container +*ModelEndpointsApi* | [**models_model_id_endpoints_get**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_get) | **GET** /models/{model_id}/endpoints | List model endpoint +*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_delete**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_delete) | **DELETE** /models/{model_id}/endpoints/{model_endpoint_id} | Stop serving traffic to the model endpoint, then delete it. +*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_get**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id} | Get a model endpoint +*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_put**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id} | Update model endpoint data. Mainly used to update its rule. +*ModelEndpointsApi* | [**models_model_id_endpoints_post**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_post) | **POST** /models/{model_id}/endpoints | Create a model endpoint +*ModelEndpointsApi* | [**projects_project_id_model_endpoints_get**](client/docs/ModelEndpointsApi.md#projects_project_id_model_endpoints_get) | **GET** /projects/{project_id}/model_endpoints | List existing model endpoints for all models in particular project +*ModelSchemaApi* | [**models_model_id_schemas_get**](client/docs/ModelSchemaApi.md#models_model_id_schemas_get) | **GET** /models/{model_id}/schemas | List all of the model schemas +*ModelSchemaApi* | [**models_model_id_schemas_put**](client/docs/ModelSchemaApi.md#models_model_id_schemas_put) | **PUT** /models/{model_id}/schemas | Creating new schemas for a model +*ModelSchemaApi* | [**models_model_id_schemas_schema_id_delete**](client/docs/ModelSchemaApi.md#models_model_id_schemas_schema_id_delete) | **DELETE** /models/{model_id}/schemas/{schema_id} | Delete schema +*ModelSchemaApi* | [**models_model_id_schemas_schema_id_get**](client/docs/ModelSchemaApi.md#models_model_id_schemas_schema_id_get) | **GET** /models/{model_id}/schemas/{schema_id} | Get detail of the schema +*ModelsApi* | [**alerts_teams_get**](client/docs/ModelsApi.md#alerts_teams_get) | **GET** /alerts/teams | Lists teams for alert notification channel. +*ModelsApi* | [**models_model_id_alerts_get**](client/docs/ModelsApi.md#models_model_id_alerts_get) | **GET** /models/{model_id}/alerts | Lists alerts for given model. +*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_get**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Gets alert for given model endpoint. +*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_post**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_post) | **POST** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint. +*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_put**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint. +*ModelsApi* | [**projects_project_id_models_get**](client/docs/ModelsApi.md#projects_project_id_models_get) | **GET** /projects/{project_id}/models | List existing models +*ModelsApi* | [**projects_project_id_models_model_id_delete**](client/docs/ModelsApi.md#projects_project_id_models_model_id_delete) | **DELETE** /projects/{project_id}/models/{model_id} | Delete model +*ModelsApi* | [**projects_project_id_models_model_id_get**](client/docs/ModelsApi.md#projects_project_id_models_model_id_get) | **GET** /projects/{project_id}/models/{model_id} | Get model +*ModelsApi* | [**projects_project_id_models_post**](client/docs/ModelsApi.md#projects_project_id_models_post) | **POST** /projects/{project_id}/models | Register a new models +*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_get) | **GET** /models/{model_id}/versions/{version_id}/jobs | List all prediction jobs of a model version +*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_containers_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_containers_get) | **GET** /models/{model_id}/versions/{version_id}/jobs/{job_id}/containers | Get all container belong to a prediction job +*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_get) | **GET** /models/{model_id}/versions/{version_id}/jobs/{job_id} | Get prediction jobs with given id +*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_stop_put**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_stop_put) | **PUT** /models/{model_id}/versions/{version_id}/jobs/{job_id}/stop | Stop prediction jobs with given id +*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_post**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_post) | **POST** /models/{model_id}/versions/{version_id}/jobs | Create a prediction job from the given model version +*PredictionJobsApi* | [**projects_project_id_jobs_get**](client/docs/PredictionJobsApi.md#projects_project_id_jobs_get) | **GET** /projects/{project_id}/jobs | List all prediction jobs created using the model +*ProjectApi* | [**projects_get**](client/docs/ProjectApi.md#projects_get) | **GET** /projects | List existing projects +*ProjectApi* | [**projects_post**](client/docs/ProjectApi.md#projects_post) | **POST** /projects | Create new project +*ProjectApi* | [**projects_project_id_get**](client/docs/ProjectApi.md#projects_project_id_get) | **GET** /projects/{project_id} | Get project +*ProjectApi* | [**projects_project_id_put**](client/docs/ProjectApi.md#projects_project_id_put) | **PUT** /projects/{project_id} | Update project +*SecretApi* | [**projects_project_id_secrets_get**](client/docs/SecretApi.md#projects_project_id_secrets_get) | **GET** /projects/{project_id}/secrets | List secret +*SecretApi* | [**projects_project_id_secrets_post**](client/docs/SecretApi.md#projects_project_id_secrets_post) | **POST** /projects/{project_id}/secrets | Create secret +*SecretApi* | [**projects_project_id_secrets_secret_id_delete**](client/docs/SecretApi.md#projects_project_id_secrets_secret_id_delete) | **DELETE** /projects/{project_id}/secrets/{secret_id} | Delete secret +*SecretApi* | [**projects_project_id_secrets_secret_id_patch**](client/docs/SecretApi.md#projects_project_id_secrets_secret_id_patch) | **PATCH** /projects/{project_id}/secrets/{secret_id} | Update secret +*StandardTransformerApi* | [**standard_transformer_simulate_post**](client/docs/StandardTransformerApi.md#standard_transformer_simulate_post) | **POST** /standard_transformer/simulate | Simulate standard transformer +*VersionApi* | [**models_model_id_versions_get**](client/docs/VersionApi.md#models_model_id_versions_get) | **GET** /models/{model_id}/versions | List versions of the models +*VersionApi* | [**models_model_id_versions_post**](client/docs/VersionApi.md#models_model_id_versions_post) | **POST** /models/{model_id}/versions | Log a new version of the models +*VersionApi* | [**models_model_id_versions_version_id_delete**](client/docs/VersionApi.md#models_model_id_versions_version_id_delete) | **DELETE** /models/{model_id}/versions/{version_id} | Delete version by ID from model +*VersionApi* | [**models_model_id_versions_version_id_get**](client/docs/VersionApi.md#models_model_id_versions_version_id_get) | **GET** /models/{model_id}/versions/{version_id} | Get version by ID from model +*VersionApi* | [**models_model_id_versions_version_id_patch**](client/docs/VersionApi.md#models_model_id_versions_version_id_patch) | **PATCH** /models/{model_id}/versions/{version_id} | Patch the version + + +## Documentation For Models + + - [AlertConditionMetricType](client/docs/AlertConditionMetricType.md) + - [AlertConditionSeverity](client/docs/AlertConditionSeverity.md) + - [AutoscalingPolicy](client/docs/AutoscalingPolicy.md) + - [BinaryClassificationOutput](client/docs/BinaryClassificationOutput.md) + - [Config](client/docs/Config.md) + - [Container](client/docs/Container.md) + - [CustomPredictor](client/docs/CustomPredictor.md) + - [DeploymentMode](client/docs/DeploymentMode.md) + - [EndpointStatus](client/docs/EndpointStatus.md) + - [EnvVar](client/docs/EnvVar.md) + - [Environment](client/docs/Environment.md) + - [FileFormat](client/docs/FileFormat.md) + - [GPUConfig](client/docs/GPUConfig.md) + - [GPUToleration](client/docs/GPUToleration.md) + - [Label](client/docs/Label.md) + - [Logger](client/docs/Logger.md) + - [LoggerConfig](client/docs/LoggerConfig.md) + - [LoggerMode](client/docs/LoggerMode.md) + - [MetricsType](client/docs/MetricsType.md) + - [MockResponse](client/docs/MockResponse.md) + - [Model](client/docs/Model.md) + - [ModelEndpoint](client/docs/ModelEndpoint.md) + - [ModelEndpointAlert](client/docs/ModelEndpointAlert.md) + - [ModelEndpointAlertCondition](client/docs/ModelEndpointAlertCondition.md) + - [ModelEndpointRule](client/docs/ModelEndpointRule.md) + - [ModelEndpointRuleDestination](client/docs/ModelEndpointRuleDestination.md) + - [ModelPredictionConfig](client/docs/ModelPredictionConfig.md) + - [ModelPredictionOutput](client/docs/ModelPredictionOutput.md) + - [ModelPredictionOutputClass](client/docs/ModelPredictionOutputClass.md) + - [ModelSchema](client/docs/ModelSchema.md) + - [OperationTracing](client/docs/OperationTracing.md) + - [PipelineTracing](client/docs/PipelineTracing.md) + - [PredictionJob](client/docs/PredictionJob.md) + - [PredictionJobConfig](client/docs/PredictionJobConfig.md) + - [PredictionJobConfigBigquerySink](client/docs/PredictionJobConfigBigquerySink.md) + - [PredictionJobConfigBigquerySource](client/docs/PredictionJobConfigBigquerySource.md) + - [PredictionJobConfigGcsSink](client/docs/PredictionJobConfigGcsSink.md) + - [PredictionJobConfigGcsSource](client/docs/PredictionJobConfigGcsSource.md) + - [PredictionJobConfigModel](client/docs/PredictionJobConfigModel.md) + - [PredictionJobConfigModelResult](client/docs/PredictionJobConfigModelResult.md) + - [PredictionJobResourceRequest](client/docs/PredictionJobResourceRequest.md) + - [PredictionLoggerConfig](client/docs/PredictionLoggerConfig.md) + - [Project](client/docs/Project.md) + - [Protocol](client/docs/Protocol.md) + - [RankingOutput](client/docs/RankingOutput.md) + - [RegressionOutput](client/docs/RegressionOutput.md) + - [ResourceRequest](client/docs/ResourceRequest.md) + - [ResultType](client/docs/ResultType.md) + - [SaveMode](client/docs/SaveMode.md) + - [SchemaSpec](client/docs/SchemaSpec.md) + - [Secret](client/docs/Secret.md) + - [StandardTransformerSimulationRequest](client/docs/StandardTransformerSimulationRequest.md) + - [StandardTransformerSimulationResponse](client/docs/StandardTransformerSimulationResponse.md) + - [Transformer](client/docs/Transformer.md) + - [ValueType](client/docs/ValueType.md) + - [Version](client/docs/Version.md) + - [VersionEndpoint](client/docs/VersionEndpoint.md) + + + +## Documentation For Authorization + + +Authentication schemes defined for the API: + +### Bearer + +- **Type**: API key +- **API key parameter name**: Authorization +- **Location**: HTTP header + + +## Author + + + + diff --git a/python/sdk/merlin/observability/inference.py b/python/sdk/merlin/observability/inference.py index 0a067c542..dc6287069 100644 --- a/python/sdk/merlin/observability/inference.py +++ b/python/sdk/merlin/observability/inference.py @@ -80,13 +80,15 @@ def decode(cls, input: Dict): be made to the input dataframe. :param df: Input dataframe. + :param session_id_column: Name of the column containing the session id. + :param row_id_column: Name of the column containing the row id. :param observation_types: Types of observations to be included in the output dataframe. :return: output dataframe """ @abc.abstractmethod def preprocess( - self, df: pd.DataFrame, observation_types: List[ObservationType] + self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType] ) -> pd.DataFrame: raise NotImplementedError @@ -120,7 +122,7 @@ class RegressionOutput(PredictionOutput): actual_score_column: str def preprocess( - self, df: pd.DataFrame, observation_types: List[ObservationType] + self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType] ) -> pd.DataFrame: return df @@ -144,14 +146,14 @@ class BinaryClassificationOutput(PredictionOutput): Attributes: prediction_score_column: Name of the column containing the prediction score. Prediction score must be between 0.0 and 1.0. - actual_label_column: Name of the column containing the actual class. + actual_score_column: Name of the column containing the actual score. positive_class_label: Label for positive class. negative_class_label: Label for negative class. score_threshold: Score threshold for prediction to be considered as positive class. """ prediction_score_column: str - actual_label_column: str + actual_score_column: str positive_class_label: str negative_class_label: str score_threshold: float = 0.5 @@ -161,8 +163,8 @@ def prediction_label_column(self) -> str: return "_prediction_label" @property - def actual_score_column(self) -> str: - return "_actual_score" + def actual_label_column(self) -> str: + return "_actual_label" def prediction_label(self, prediction_score: float) -> str: """ @@ -176,28 +178,28 @@ def prediction_label(self, prediction_score: float) -> str: else self.negative_class_label ) - def actual_score(self, actual_label: str) -> float: + def actual_label(self, actual_score: int) -> str: """ - Derive actual score from actual label. - :param actual_label: Actual label. - :return: actual score. Either 0.0 for negative class or 1.0 for positive class. + Derive actual label from actual score. + :param actual_score: Actual score. + :return: actual label """ - if actual_label not in [self.positive_class_label, self.negative_class_label]: - raise ValueError( - f"Actual label must be one of the classes, got {actual_label}" - ) - return 1.0 if actual_label == self.positive_class_label else 0.0 + return ( + self.positive_class_label + if actual_score >= self.score_threshold + else self.negative_class_label + ) def preprocess( - self, df: pd.DataFrame, observation_types: List[ObservationType] + self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType] ) -> pd.DataFrame: if ObservationType.PREDICTION in observation_types: df[self.prediction_label_column] = df[self.prediction_score_column].apply( self.prediction_label ) if ObservationType.GROUND_TRUTH in observation_types: - df[self.actual_score_column] = df[self.actual_label_column].apply( - self.actual_score + df[self.actual_label_column] = df[self.actual_score_column].apply( + self.actual_label ) return df @@ -218,13 +220,11 @@ def ground_truth_types(self) -> Dict[str, ValueType]: @dataclass class RankingOutput(PredictionOutput): rank_score_column: str - prediction_group_id_column: str relevance_score_column: str """ Ranking model prediction output schema. Attributes: rank_score_column: Name of the column containing the ranking score of the prediction. - prediction_group_id_column: Name of the column containing the prediction group id. relevance_score_column: Name of the column containing the relevance score of the prediction. """ @@ -233,11 +233,11 @@ def rank_column(self) -> str: return "_rank" def preprocess( - self, df: pd.DataFrame, observation_types: List[ObservationType] + self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType] ) -> pd.DataFrame: if ObservationType.PREDICTION in observation_types: df[self.rank_column] = ( - df.groupby(self.prediction_group_id_column)[self.rank_score_column] + df.groupby(session_id_column)[self.rank_score_column] .rank(method="first", ascending=False) .astype(np.int_) ) @@ -245,8 +245,8 @@ def preprocess( def prediction_types(self) -> Dict[str, ValueType]: return { + self.rank_score_column: ValueType.FLOAT64, self.rank_column: ValueType.INT64, - self.prediction_group_id_column: ValueType.STRING, } def ground_truth_types(self) -> Dict[str, ValueType]: @@ -265,6 +265,8 @@ class InferenceSchema: decoder=PredictionOutput.decode, ) ) + session_id_column: str = "session_id" + row_id_column: str = "row_id" prediction_id_column: str = "prediction_id" tag_columns: Optional[List[str]] = None @@ -279,3 +281,12 @@ def prediction_columns(self) -> List[str]: @property def ground_truth_columns(self) -> List[str]: return list(self.model_prediction_output.ground_truth_types().keys()) + + def preprocess( + self, df: pd.DataFrame, observation_types: List[ObservationType] + ) -> pd.DataFrame: + if observation_types != ObservationType.FEATURE: + df[self.prediction_id_column] = df[self.session_id_column] + "_" + df[self.row_id_column] + return self.model_prediction_output.preprocess( + df, self.session_id_column, self.row_id_column, observation_types + ) diff --git a/python/sdk/test/observability_test.py b/python/sdk/test/observability_test.py index a670a854e..1241a705c 100644 --- a/python/sdk/test/observability_test.py +++ b/python/sdk/test/observability_test.py @@ -8,7 +8,7 @@ def binary_classification_output() -> BinaryClassificationOutput: return BinaryClassificationOutput( prediction_score_column="prediction_score", - actual_label_column="target", + actual_score_column="actual_score", positive_class_label="ACCEPTED", negative_class_label="REJECTED", score_threshold=0.5, @@ -21,7 +21,7 @@ def test_prediction_output_encoding(binary_classification_output: BinaryClassifi assert encoded_output == { "output_class": "BinaryClassificationOutput", "prediction_score_column": "prediction_score", - "actual_label_column": "target", + "actual_score_column": "actual_score", "positive_class_label": "ACCEPTED", "negative_class_label": "REJECTED", "score_threshold": 0.5, @@ -35,31 +35,35 @@ def test_prediction_output_encoding(binary_classification_output: BinaryClassifi def test_binary_classification_preprocessing(binary_classification_output: BinaryClassificationOutput): input_df = pd.DataFrame.from_records( [ - [0.8, "ACCEPTED"], - [0.5, "ACCEPTED"], - [1.0, "REJECTED"], - [0.4, "ACCEPTED"] + ["9001", "1001", 0.8, 1.0], + ["9002", "1001", 0.5, 1.0], + ["9002", "1002", 1.0, 0.0], + ["9003", "1003", 0.4, 1.0] ], columns=[ + "order_id", + "driver_id", "prediction_score", - "target" + "actual_score" ], ) - processed_df = binary_classification_output.preprocess(input_df, [ObservationType.PREDICTION, ObservationType.GROUND_TRUTH]) + processed_df = binary_classification_output.preprocess(input_df, "order_id", "driver_id", [ObservationType.PREDICTION, ObservationType.GROUND_TRUTH]) pd.testing.assert_frame_equal( processed_df, pd.DataFrame.from_records( [ - [0.8, "ACCEPTED" , "ACCEPTED", 1.0], - [0.5, "ACCEPTED", "ACCEPTED", 1.0], - [1.0, "REJECTED", "ACCEPTED", 0.0], - [0.4, "ACCEPTED", "REJECTED", 1.0] + ["9001", "1001", 0.8, 1.0, "ACCEPTED" , "ACCEPTED"], + ["9002", "1001", 0.5, 1.0, "ACCEPTED", "ACCEPTED"], + ["9002", "1002", 1.0, 0.0, "ACCEPTED", "REJECTED"], + ["9003", "1003", 0.4, 1.0, "REJECTED", "ACCEPTED"] ], columns=[ + "order_id", + "driver_id", "prediction_score", - "target", + "actual_score", "_prediction_label", - "_actual_score", + "_actual_label", ], ), ) @@ -69,7 +73,6 @@ def test_binary_classification_preprocessing(binary_classification_output: Binar def ranking_prediction_output() -> RankingOutput: return RankingOutput( rank_score_column="score", - prediction_group_id_column="order_id", relevance_score_column="relevance_score_column", ) @@ -78,33 +81,35 @@ def ranking_prediction_output() -> RankingOutput: def test_ranking_preprocessing(ranking_prediction_output: RankingOutput): input_df = pd.DataFrame.from_records( [ - [1.0, "1001"], - [0.8, "1001"], - [0.1, "1001"], - [0.3, "1002"], - [0.2, "1002"], - [0.1, "1002"], + [1.0, "1001", "d1"], + [0.8, "1001", "d2"], + [0.1, "1001", "d3"], + [0.3, "1002", "d1"], + [0.2, "1002", "d2"], + [0.1, "1002", "d3"], ], columns=[ "score", "order_id", + "driver_id" ], ) - processed_df = ranking_prediction_output.preprocess(input_df, [ObservationType.PREDICTION]) + processed_df = ranking_prediction_output.preprocess(input_df, "order_id", "driver_id", [ObservationType.PREDICTION]) pd.testing.assert_frame_equal( processed_df, pd.DataFrame.from_records( [ - [1.0, "1001", 1], - [0.8, "1001", 2], - [0.1, "1001", 3], - [0.3, "1002", 1], - [0.2, "1002", 2], - [0.1, "1002", 3], + [1.0, "1001", "d1", 1], + [0.8, "1001", "d2", 2], + [0.1, "1001", "d3", 3], + [0.3, "1002", "d1", 1], + [0.2, "1002", "d2", 2], + [0.1, "1002", "d3", 3], ], columns=[ "score", "order_id", + "driver_id", "_rank", ], ), diff --git a/swagger.yaml b/swagger.yaml index 3bf626b5a..e83dd57f0 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -1380,12 +1380,15 @@ components: SchemaSpec: type: object required: - - prediction_id_column - model_prediction_output - feature_types properties: prediction_id_column: type: string + session_id_column: + type: string + row_id_column: + type: string model_prediction_output: $ref : '#/components/schemas/ModelPredictionOutput' tag_columns: @@ -1427,7 +1430,7 @@ components: properties: prediction_score_column: type: string - actual_label_column: + actual_score_column: type: string positive_class_label: type: string @@ -1442,13 +1445,10 @@ components: type: object required: - rank_score_column - - prediction_group_id_column - output_class properties: rank_score_column: type: string - prediction_group_id_column: - type: string relevance_score_column: type: string output_class: