diff --git a/api/client/model_binary_classification_output.go b/api/client/model_binary_classification_output.go
index a75023589..a93108ed5 100644
--- a/api/client/model_binary_classification_output.go
+++ b/api/client/model_binary_classification_output.go
@@ -21,7 +21,7 @@ var _ MappedNullable = &BinaryClassificationOutput{}
// BinaryClassificationOutput struct for BinaryClassificationOutput
type BinaryClassificationOutput struct {
PredictionScoreColumn string `json:"prediction_score_column"`
- ActualLabelColumn *string `json:"actual_label_column,omitempty"`
+ ActualScoreColumn *string `json:"actual_score_column,omitempty"`
PositiveClassLabel string `json:"positive_class_label"`
NegativeClassLabel string `json:"negative_class_label"`
ScoreThreshold *float32 `json:"score_threshold,omitempty"`
@@ -75,36 +75,36 @@ func (o *BinaryClassificationOutput) SetPredictionScoreColumn(v string) {
o.PredictionScoreColumn = v
}
-// GetActualLabelColumn returns the ActualLabelColumn field value if set, zero value otherwise.
-func (o *BinaryClassificationOutput) GetActualLabelColumn() string {
- if o == nil || IsNil(o.ActualLabelColumn) {
+// GetActualScoreColumn returns the ActualScoreColumn field value if set, zero value otherwise.
+func (o *BinaryClassificationOutput) GetActualScoreColumn() string {
+ if o == nil || IsNil(o.ActualScoreColumn) {
var ret string
return ret
}
- return *o.ActualLabelColumn
+ return *o.ActualScoreColumn
}
-// GetActualLabelColumnOk returns a tuple with the ActualLabelColumn field value if set, nil otherwise
+// GetActualScoreColumnOk returns a tuple with the ActualScoreColumn field value if set, nil otherwise
// and a boolean to check if the value has been set.
-func (o *BinaryClassificationOutput) GetActualLabelColumnOk() (*string, bool) {
- if o == nil || IsNil(o.ActualLabelColumn) {
+func (o *BinaryClassificationOutput) GetActualScoreColumnOk() (*string, bool) {
+ if o == nil || IsNil(o.ActualScoreColumn) {
return nil, false
}
- return o.ActualLabelColumn, true
+ return o.ActualScoreColumn, true
}
-// HasActualLabelColumn returns a boolean if a field has been set.
-func (o *BinaryClassificationOutput) HasActualLabelColumn() bool {
- if o != nil && !IsNil(o.ActualLabelColumn) {
+// HasActualScoreColumn returns a boolean if a field has been set.
+func (o *BinaryClassificationOutput) HasActualScoreColumn() bool {
+ if o != nil && !IsNil(o.ActualScoreColumn) {
return true
}
return false
}
-// SetActualLabelColumn gets a reference to the given string and assigns it to the ActualLabelColumn field.
-func (o *BinaryClassificationOutput) SetActualLabelColumn(v string) {
- o.ActualLabelColumn = &v
+// SetActualScoreColumn gets a reference to the given string and assigns it to the ActualScoreColumn field.
+func (o *BinaryClassificationOutput) SetActualScoreColumn(v string) {
+ o.ActualScoreColumn = &v
}
// GetPositiveClassLabel returns the PositiveClassLabel field value
@@ -222,8 +222,8 @@ func (o BinaryClassificationOutput) MarshalJSON() ([]byte, error) {
func (o BinaryClassificationOutput) ToMap() (map[string]interface{}, error) {
toSerialize := map[string]interface{}{}
toSerialize["prediction_score_column"] = o.PredictionScoreColumn
- if !IsNil(o.ActualLabelColumn) {
- toSerialize["actual_label_column"] = o.ActualLabelColumn
+ if !IsNil(o.ActualScoreColumn) {
+ toSerialize["actual_score_column"] = o.ActualScoreColumn
}
toSerialize["positive_class_label"] = o.PositiveClassLabel
toSerialize["negative_class_label"] = o.NegativeClassLabel
diff --git a/api/client/model_model_prediction_output.go b/api/client/model_model_prediction_output.go
index 3e0d72e8b..c5313594f 100644
--- a/api/client/model_model_prediction_output.go
+++ b/api/client/model_model_prediction_output.go
@@ -46,58 +46,50 @@ func RegressionOutputAsModelPredictionOutput(v *RegressionOutput) ModelPredictio
// Unmarshal JSON data into one of the pointers in the struct
func (dst *ModelPredictionOutput) UnmarshalJSON(data []byte) error {
var err error
- match := 0
- // try to unmarshal data into BinaryClassificationOutput
- err = newStrictDecoder(data).Decode(&dst.BinaryClassificationOutput)
- if err == nil {
- jsonBinaryClassificationOutput, _ := json.Marshal(dst.BinaryClassificationOutput)
- if string(jsonBinaryClassificationOutput) == "{}" { // empty struct
- dst.BinaryClassificationOutput = nil
+ // use discriminator value to speed up the lookup
+ var jsonDict map[string]interface{}
+ err = newStrictDecoder(data).Decode(&jsonDict)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal JSON into map for the discriminator lookup")
+ }
+
+ // check if the discriminator value is 'BinaryClassificationOutput'
+ if jsonDict["output_class"] == "BinaryClassificationOutput" {
+ // try to unmarshal JSON data into BinaryClassificationOutput
+ err = json.Unmarshal(data, &dst.BinaryClassificationOutput)
+ if err == nil {
+ return nil // data stored in dst.BinaryClassificationOutput, return on the first match
} else {
- match++
+ dst.BinaryClassificationOutput = nil
+ return fmt.Errorf("failed to unmarshal ModelPredictionOutput as BinaryClassificationOutput: %s", err.Error())
}
- } else {
- dst.BinaryClassificationOutput = nil
}
- // try to unmarshal data into RankingOutput
- err = newStrictDecoder(data).Decode(&dst.RankingOutput)
- if err == nil {
- jsonRankingOutput, _ := json.Marshal(dst.RankingOutput)
- if string(jsonRankingOutput) == "{}" { // empty struct
- dst.RankingOutput = nil
+ // check if the discriminator value is 'RankingOutput'
+ if jsonDict["output_class"] == "RankingOutput" {
+ // try to unmarshal JSON data into RankingOutput
+ err = json.Unmarshal(data, &dst.RankingOutput)
+ if err == nil {
+ return nil // data stored in dst.RankingOutput, return on the first match
} else {
- match++
+ dst.RankingOutput = nil
+ return fmt.Errorf("failed to unmarshal ModelPredictionOutput as RankingOutput: %s", err.Error())
}
- } else {
- dst.RankingOutput = nil
}
- // try to unmarshal data into RegressionOutput
- err = newStrictDecoder(data).Decode(&dst.RegressionOutput)
- if err == nil {
- jsonRegressionOutput, _ := json.Marshal(dst.RegressionOutput)
- if string(jsonRegressionOutput) == "{}" { // empty struct
- dst.RegressionOutput = nil
+ // check if the discriminator value is 'RegressionOutput'
+ if jsonDict["output_class"] == "RegressionOutput" {
+ // try to unmarshal JSON data into RegressionOutput
+ err = json.Unmarshal(data, &dst.RegressionOutput)
+ if err == nil {
+ return nil // data stored in dst.RegressionOutput, return on the first match
} else {
- match++
+ dst.RegressionOutput = nil
+ return fmt.Errorf("failed to unmarshal ModelPredictionOutput as RegressionOutput: %s", err.Error())
}
- } else {
- dst.RegressionOutput = nil
}
- if match > 1 { // more than 1 match
- // reset to nil
- dst.BinaryClassificationOutput = nil
- dst.RankingOutput = nil
- dst.RegressionOutput = nil
-
- return fmt.Errorf("data matches more than one schema in oneOf(ModelPredictionOutput)")
- } else if match == 1 {
- return nil // exactly one match
- } else { // no match
- return fmt.Errorf("data failed to match schemas in oneOf(ModelPredictionOutput)")
- }
+ return nil
}
// Marshal data from the first non-nil pointers in the struct to JSON
diff --git a/api/client/model_ranking_output.go b/api/client/model_ranking_output.go
index 93b947066..a339867c8 100644
--- a/api/client/model_ranking_output.go
+++ b/api/client/model_ranking_output.go
@@ -20,10 +20,9 @@ var _ MappedNullable = &RankingOutput{}
// RankingOutput struct for RankingOutput
type RankingOutput struct {
- RankScoreColumn string `json:"rank_score_column"`
- PredictionGroupIdColumn string `json:"prediction_group_id_column"`
- RelevanceScoreColumn *string `json:"relevance_score_column,omitempty"`
- OutputClass ModelPredictionOutputClass `json:"output_class"`
+ RankScoreColumn string `json:"rank_score_column"`
+ RelevanceScoreColumn *string `json:"relevance_score_column,omitempty"`
+ OutputClass ModelPredictionOutputClass `json:"output_class"`
}
type _RankingOutput RankingOutput
@@ -32,10 +31,9 @@ type _RankingOutput RankingOutput
// This constructor will assign default values to properties that have it defined,
// and makes sure properties required by API are set, but the set of arguments
// will change when the set of required properties is changed
-func NewRankingOutput(rankScoreColumn string, predictionGroupIdColumn string, outputClass ModelPredictionOutputClass) *RankingOutput {
+func NewRankingOutput(rankScoreColumn string, outputClass ModelPredictionOutputClass) *RankingOutput {
this := RankingOutput{}
this.RankScoreColumn = rankScoreColumn
- this.PredictionGroupIdColumn = predictionGroupIdColumn
this.OutputClass = outputClass
return &this
}
@@ -72,30 +70,6 @@ func (o *RankingOutput) SetRankScoreColumn(v string) {
o.RankScoreColumn = v
}
-// GetPredictionGroupIdColumn returns the PredictionGroupIdColumn field value
-func (o *RankingOutput) GetPredictionGroupIdColumn() string {
- if o == nil {
- var ret string
- return ret
- }
-
- return o.PredictionGroupIdColumn
-}
-
-// GetPredictionGroupIdColumnOk returns a tuple with the PredictionGroupIdColumn field value
-// and a boolean to check if the value has been set.
-func (o *RankingOutput) GetPredictionGroupIdColumnOk() (*string, bool) {
- if o == nil {
- return nil, false
- }
- return &o.PredictionGroupIdColumn, true
-}
-
-// SetPredictionGroupIdColumn sets field value
-func (o *RankingOutput) SetPredictionGroupIdColumn(v string) {
- o.PredictionGroupIdColumn = v
-}
-
// GetRelevanceScoreColumn returns the RelevanceScoreColumn field value if set, zero value otherwise.
func (o *RankingOutput) GetRelevanceScoreColumn() string {
if o == nil || IsNil(o.RelevanceScoreColumn) {
@@ -163,7 +137,6 @@ func (o RankingOutput) MarshalJSON() ([]byte, error) {
func (o RankingOutput) ToMap() (map[string]interface{}, error) {
toSerialize := map[string]interface{}{}
toSerialize["rank_score_column"] = o.RankScoreColumn
- toSerialize["prediction_group_id_column"] = o.PredictionGroupIdColumn
if !IsNil(o.RelevanceScoreColumn) {
toSerialize["relevance_score_column"] = o.RelevanceScoreColumn
}
@@ -177,7 +150,6 @@ func (o *RankingOutput) UnmarshalJSON(bytes []byte) (err error) {
// that every required field exists as a key in the generic map.
requiredProperties := []string{
"rank_score_column",
- "prediction_group_id_column",
"output_class",
}
diff --git a/api/client/model_schema_spec.go b/api/client/model_schema_spec.go
index ec474cb10..9616b1e84 100644
--- a/api/client/model_schema_spec.go
+++ b/api/client/model_schema_spec.go
@@ -20,7 +20,9 @@ var _ MappedNullable = &SchemaSpec{}
// SchemaSpec struct for SchemaSpec
type SchemaSpec struct {
- PredictionIdColumn string `json:"prediction_id_column"`
+ PredictionIdColumn *string `json:"prediction_id_column,omitempty"`
+ SessionIdColumn *string `json:"session_id_column,omitempty"`
+ RowIdColumn *string `json:"row_id_column,omitempty"`
ModelPredictionOutput ModelPredictionOutput `json:"model_prediction_output"`
TagColumns []string `json:"tag_columns,omitempty"`
FeatureTypes map[string]ValueType `json:"feature_types"`
@@ -32,9 +34,8 @@ type _SchemaSpec SchemaSpec
// This constructor will assign default values to properties that have it defined,
// and makes sure properties required by API are set, but the set of arguments
// will change when the set of required properties is changed
-func NewSchemaSpec(predictionIdColumn string, modelPredictionOutput ModelPredictionOutput, featureTypes map[string]ValueType) *SchemaSpec {
+func NewSchemaSpec(modelPredictionOutput ModelPredictionOutput, featureTypes map[string]ValueType) *SchemaSpec {
this := SchemaSpec{}
- this.PredictionIdColumn = predictionIdColumn
this.ModelPredictionOutput = modelPredictionOutput
this.FeatureTypes = featureTypes
return &this
@@ -48,28 +49,100 @@ func NewSchemaSpecWithDefaults() *SchemaSpec {
return &this
}
-// GetPredictionIdColumn returns the PredictionIdColumn field value
+// GetPredictionIdColumn returns the PredictionIdColumn field value if set, zero value otherwise.
func (o *SchemaSpec) GetPredictionIdColumn() string {
- if o == nil {
+ if o == nil || IsNil(o.PredictionIdColumn) {
var ret string
return ret
}
-
- return o.PredictionIdColumn
+ return *o.PredictionIdColumn
}
-// GetPredictionIdColumnOk returns a tuple with the PredictionIdColumn field value
+// GetPredictionIdColumnOk returns a tuple with the PredictionIdColumn field value if set, nil otherwise
// and a boolean to check if the value has been set.
func (o *SchemaSpec) GetPredictionIdColumnOk() (*string, bool) {
- if o == nil {
+ if o == nil || IsNil(o.PredictionIdColumn) {
return nil, false
}
- return &o.PredictionIdColumn, true
+ return o.PredictionIdColumn, true
+}
+
+// HasPredictionIdColumn returns a boolean if a field has been set.
+func (o *SchemaSpec) HasPredictionIdColumn() bool {
+ if o != nil && !IsNil(o.PredictionIdColumn) {
+ return true
+ }
+
+ return false
}
-// SetPredictionIdColumn sets field value
+// SetPredictionIdColumn gets a reference to the given string and assigns it to the PredictionIdColumn field.
func (o *SchemaSpec) SetPredictionIdColumn(v string) {
- o.PredictionIdColumn = v
+ o.PredictionIdColumn = &v
+}
+
+// GetSessionIdColumn returns the SessionIdColumn field value if set, zero value otherwise.
+func (o *SchemaSpec) GetSessionIdColumn() string {
+ if o == nil || IsNil(o.SessionIdColumn) {
+ var ret string
+ return ret
+ }
+ return *o.SessionIdColumn
+}
+
+// GetSessionIdColumnOk returns a tuple with the SessionIdColumn field value if set, nil otherwise
+// and a boolean to check if the value has been set.
+func (o *SchemaSpec) GetSessionIdColumnOk() (*string, bool) {
+ if o == nil || IsNil(o.SessionIdColumn) {
+ return nil, false
+ }
+ return o.SessionIdColumn, true
+}
+
+// HasSessionIdColumn returns a boolean if a field has been set.
+func (o *SchemaSpec) HasSessionIdColumn() bool {
+ if o != nil && !IsNil(o.SessionIdColumn) {
+ return true
+ }
+
+ return false
+}
+
+// SetSessionIdColumn gets a reference to the given string and assigns it to the SessionIdColumn field.
+func (o *SchemaSpec) SetSessionIdColumn(v string) {
+ o.SessionIdColumn = &v
+}
+
+// GetRowIdColumn returns the RowIdColumn field value if set, zero value otherwise.
+func (o *SchemaSpec) GetRowIdColumn() string {
+ if o == nil || IsNil(o.RowIdColumn) {
+ var ret string
+ return ret
+ }
+ return *o.RowIdColumn
+}
+
+// GetRowIdColumnOk returns a tuple with the RowIdColumn field value if set, nil otherwise
+// and a boolean to check if the value has been set.
+func (o *SchemaSpec) GetRowIdColumnOk() (*string, bool) {
+ if o == nil || IsNil(o.RowIdColumn) {
+ return nil, false
+ }
+ return o.RowIdColumn, true
+}
+
+// HasRowIdColumn returns a boolean if a field has been set.
+func (o *SchemaSpec) HasRowIdColumn() bool {
+ if o != nil && !IsNil(o.RowIdColumn) {
+ return true
+ }
+
+ return false
+}
+
+// SetRowIdColumn gets a reference to the given string and assigns it to the RowIdColumn field.
+func (o *SchemaSpec) SetRowIdColumn(v string) {
+ o.RowIdColumn = &v
}
// GetModelPredictionOutput returns the ModelPredictionOutput field value
@@ -162,7 +235,15 @@ func (o SchemaSpec) MarshalJSON() ([]byte, error) {
func (o SchemaSpec) ToMap() (map[string]interface{}, error) {
toSerialize := map[string]interface{}{}
- toSerialize["prediction_id_column"] = o.PredictionIdColumn
+ if !IsNil(o.PredictionIdColumn) {
+ toSerialize["prediction_id_column"] = o.PredictionIdColumn
+ }
+ if !IsNil(o.SessionIdColumn) {
+ toSerialize["session_id_column"] = o.SessionIdColumn
+ }
+ if !IsNil(o.RowIdColumn) {
+ toSerialize["row_id_column"] = o.RowIdColumn
+ }
toSerialize["model_prediction_output"] = o.ModelPredictionOutput
if !IsNil(o.TagColumns) {
toSerialize["tag_columns"] = o.TagColumns
@@ -176,7 +257,6 @@ func (o *SchemaSpec) UnmarshalJSON(bytes []byte) (err error) {
// by unmarshalling the object into a generic map with string keys and checking
// that every required field exists as a key in the generic map.
requiredProperties := []string{
- "prediction_id_column",
"model_prediction_output",
"feature_types",
}
diff --git a/python/observation-publisher/publisher/observation_sink.py b/python/observation-publisher/publisher/observation_sink.py
index a09ddbb6c..b315e815b 100644
--- a/python/observation-publisher/publisher/observation_sink.py
+++ b/python/observation-publisher/publisher/observation_sink.py
@@ -15,14 +15,13 @@
from google.cloud.bigquery import (SchemaField, Table, TimePartitioning,
TimePartitioningType)
from merlin.observability.inference import (BinaryClassificationOutput,
- InferenceSchema, ObservationType,
+ InferenceSchema,
RankingOutput, RegressionOutput,
ValueType)
from publisher.config import ObservationSinkConfig, ObservationSinkType
-from publisher.prediction_log_parser import (MODEL_VERSION_COLUMN,
- PREDICTION_LOG_TIMESTAMP_COLUMN,
- ROW_ID_COLUMN, SESSION_ID_COLUMN)
+from publisher.prediction_log_parser import (PREDICTION_LOG_MODEL_VERSION_COLUMN,
+ PREDICTION_LOG_TIMESTAMP_COLUMN)
class ObservationSink(abc.ABC):
@@ -83,7 +82,6 @@ def _common_arize_schema_attributes(self) -> dict:
feature_column_names=self._inference_schema.feature_columns,
prediction_id_column_name=self._inference_schema.prediction_id_column,
timestamp_column_name=PREDICTION_LOG_TIMESTAMP_COLUMN,
- tag_column_names=self._inference_schema.tag_columns,
)
def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]:
@@ -102,7 +100,7 @@ def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]:
elif isinstance(prediction_output, RankingOutput):
schema_attributes = self._common_arize_schema_attributes() | dict(
rank_column_name=prediction_output.rank_column,
- prediction_group_id_column_name=SESSION_ID_COLUMN,
+ prediction_group_id_column_name=self._inference_schema.session_id_column,
)
model_type = ArizeModelType.RANKING
else:
@@ -113,21 +111,10 @@ def _to_arize_schema(self) -> Tuple[ArizeModelType, ArizeSchema]:
return model_type, ArizeSchema(**schema_attributes)
def write(self, df: pd.DataFrame):
- df[self._inference_schema.prediction_id_column] = (
- df[SESSION_ID_COLUMN] + df[ROW_ID_COLUMN]
- )
- if isinstance(self._inference_schema.model_prediction_output, RankingOutput):
- df[
- self._inference_schema.model_prediction_output.prediction_group_id_column
- ] = df[SESSION_ID_COLUMN]
-
- processed_df = self._inference_schema.model_prediction_output.preprocess(
- df, [ObservationType.FEATURE, ObservationType.PREDICTION]
- )
model_type, arize_schema = self._to_arize_schema()
try:
self._client.log(
- dataframe=processed_df,
+ dataframe=df,
environment=Environments.PRODUCTION,
schema=arize_schema,
model_id=self._model_id,
@@ -248,11 +235,15 @@ def schema_fields(self) -> List[SchemaField]:
schema_fields = [
SchemaField(
- name=SESSION_ID_COLUMN,
+ name=self._inference_schema.session_id_column,
+ field_type="STRING",
+ ),
+ SchemaField(
+ name=self._inference_schema.row_id_column,
field_type="STRING",
),
SchemaField(
- name=ROW_ID_COLUMN,
+ name=self._inference_schema.prediction_id_column,
field_type="STRING",
),
SchemaField(
@@ -260,7 +251,7 @@ def schema_fields(self) -> List[SchemaField]:
field_type="TIMESTAMP",
),
SchemaField(
- name=MODEL_VERSION_COLUMN,
+ name=PREDICTION_LOG_MODEL_VERSION_COLUMN,
field_type="STRING",
),
]
diff --git a/python/observation-publisher/publisher/prediction_log_consumer.py b/python/observation-publisher/publisher/prediction_log_consumer.py
index eef099c53..a29d639a8 100644
--- a/python/observation-publisher/publisher/prediction_log_consumer.py
+++ b/python/observation-publisher/publisher/prediction_log_consumer.py
@@ -9,14 +9,13 @@
from caraml.upi.v1.prediction_log_pb2 import PredictionLog
from confluent_kafka import Consumer, KafkaException
from dataclasses_json import DataClassJsonMixin, dataclass_json
-from merlin.observability.inference import InferenceSchema
+from merlin.observability.inference import InferenceSchema, ObservationType
from publisher.config import ObservationSource, ObservationSourceConfig
from publisher.metric import MetricWriter
from publisher.observation_sink import ObservationSink
-from publisher.prediction_log_parser import (MODEL_VERSION_COLUMN,
+from publisher.prediction_log_parser import (PREDICTION_LOG_MODEL_VERSION_COLUMN,
PREDICTION_LOG_TIMESTAMP_COLUMN,
- ROW_ID_COLUMN, SESSION_ID_COLUMN,
PredictionLogFeatureTable,
PredictionLogResultsTable)
@@ -46,7 +45,6 @@ def start_polling(
):
try:
buffered_logs = []
- buffered_max_duration_seconds = 60
buffer_start_time = datetime.now()
while True:
logs = self.poll_new_logs()
@@ -56,7 +54,7 @@ def start_polling(
buffered_duration = (datetime.now() - buffer_start_time).seconds
if (
len(buffered_logs) < self.buffer_capacity
- and buffered_duration < buffered_max_duration_seconds
+ and buffered_duration < self.buffer_max_duration_seconds
):
continue
df = log_batch_to_dataframe(
@@ -188,10 +186,10 @@ def log_to_records(
feature_table.columns
+ prediction_results_table.columns
+ [
- SESSION_ID_COLUMN,
- ROW_ID_COLUMN,
+ inference_schema.session_id_column,
+ inference_schema.row_id_column,
PREDICTION_LOG_TIMESTAMP_COLUMN,
- MODEL_VERSION_COLUMN,
+ PREDICTION_LOG_MODEL_VERSION_COLUMN,
]
)
@@ -206,4 +204,5 @@ def log_batch_to_dataframe(
for log in logs:
rows, column_names = log_to_records(log, inference_schema, model_version)
combined_records.extend(rows)
- return pd.DataFrame.from_records(combined_records, columns=column_names)
+ df = pd.DataFrame.from_records(combined_records, columns=column_names)
+ return inference_schema.preprocess(df, [ObservationType.PREDICTION])
diff --git a/python/observation-publisher/publisher/prediction_log_parser.py b/python/observation-publisher/publisher/prediction_log_parser.py
index 97ecd4d09..4541d7b07 100644
--- a/python/observation-publisher/publisher/prediction_log_parser.py
+++ b/python/observation-publisher/publisher/prediction_log_parser.py
@@ -3,14 +3,12 @@
from typing import Dict, List, Optional, Union
import numpy as np
-from google.protobuf.internal.well_known_types import ListValue, Struct
+from google.protobuf.struct_pb2 import ListValue, Struct
from merlin.observability.inference import InferenceSchema, ValueType
from typing_extensions import Self
-SESSION_ID_COLUMN = "session_id"
-ROW_ID_COLUMN = "row_id"
PREDICTION_LOG_TIMESTAMP_COLUMN = "request_timestamp"
-MODEL_VERSION_COLUMN = "model_version"
+PREDICTION_LOG_MODEL_VERSION_COLUMN = "model_version"
@dataclass
@@ -58,8 +56,16 @@ def from_struct(
def convert_to_numpy_value(
- col_value: Optional[int | str | float | bool], value_type: ValueType
+ col_value: Optional[int | str | float | bool], value_type: Optional[ValueType]
) -> np.int64 | np.float64 | np.bool_ | np.str_:
+ if value_type is None:
+ if isinstance(col_value, (int, float)):
+ return np.float64(col_value)
+ if isinstance(col_value, str):
+ return np.str_(col_value)
+ else:
+ raise ValueError(f"Unable to infer numpy type for type: {type(col_value)}")
+
match value_type:
case ValueType.INT64:
assert isinstance(col_value, (int, float))
@@ -79,7 +85,7 @@ def convert_to_numpy_value(
def list_value_as_string_list(list_value: ListValue) -> List[str]:
string_list: List[str] = []
- for v in list_value:
+ for v in list_value.items():
assert isinstance(v, str)
string_list.append(v)
return string_list
@@ -87,9 +93,10 @@ def list_value_as_string_list(list_value: ListValue) -> List[str]:
def list_value_as_rows(list_value: ListValue) -> List[ListValue]:
rows: List[ListValue] = []
- for d in list_value:
+ for d in list_value.items():
assert isinstance(d, ListValue)
rows.append(d)
+
return rows
@@ -97,11 +104,11 @@ def list_value_as_numpy_list(
list_value: ListValue, column_names: List[str], column_types: Dict[str, ValueType]
) -> List[np.int64 | np.float64 | np.bool_ | np.str_]:
column_values: List[int | str | float | bool | None] = []
- for v in list_value:
+ for v in list_value.items():
assert isinstance(v, (int, str, float, bool, NoneType))
column_values.append(v)
return [
- convert_to_numpy_value(col_value, column_types[col_name])
+ convert_to_numpy_value(col_value, column_types.get(col_name))
for col_value, col_name in zip(column_values, column_names)
]
diff --git a/python/observation-publisher/tests/test_observation_sink.py b/python/observation-publisher/tests/test_observation_sink.py
index f12306a8e..58b5df3c5 100644
--- a/python/observation-publisher/tests/test_observation_sink.py
+++ b/python/observation-publisher/tests/test_observation_sink.py
@@ -62,16 +62,17 @@ def ranking_inference_logs() -> pd.DataFrame:
request_timestamp = datetime(2024, 1, 1, 0, 0, 0).astimezone(tz.UTC)
return pd.DataFrame.from_records(
[
- [5.0, 1.0, "1234", "1001", request_timestamp],
- [4.0, 0.9, "1234", "1002", request_timestamp],
- [3.0, 0.8, "1234", "1003", request_timestamp],
+ [5.0, 1.0, "1234", "1001", request_timestamp, 1],
+ [4.0, 0.9, "1234", "1002", request_timestamp, 1],
+ [3.0, 0.8, "1234", "1003", request_timestamp, 1],
],
columns=[
"rating",
"rank_score",
- "session_id",
- "row_id",
+ "order_id",
+ "driver_id",
"request_timestamp",
+ "_rank"
],
)
@@ -112,7 +113,6 @@ def test_binary_classification_model_preprocessing_for_arize(
def test_ranking_model_preprocessing_for_arize(
- binary_classification_inference_logs: pd.DataFrame,
ranking_inference_logs: pd.DataFrame,
):
inference_schema = InferenceSchema(
@@ -121,9 +121,10 @@ def test_ranking_model_preprocessing_for_arize(
},
model_prediction_output=RankingOutput(
rank_score_column="rank_score",
- prediction_group_id_column="order_id",
relevance_score_column="relevance_score_column",
),
+ session_id_column="order_id",
+ row_id_column="driver_id",
)
arize_client = MockArizeClient(api_key="test", space_key="test")
arize_sink = ArizeSink(
diff --git a/python/observation-publisher/tests/test_prediction_log_consumer.py b/python/observation-publisher/tests/test_prediction_log_consumer.py
index c41f9b60d..6c2920557 100644
--- a/python/observation-publisher/tests/test_prediction_log_consumer.py
+++ b/python/observation-publisher/tests/test_prediction_log_consumer.py
@@ -14,7 +14,7 @@
def new_prediction_log(
model_id: str,
model_version: str,
- prediction_id: str,
+ session_id: str,
row_ids: List[str],
input_columns: List[str],
input_data: List[List[Any]],
@@ -30,7 +30,7 @@ def new_prediction_log(
raise ValueError("input columns and input data must have the same length")
prediction_log = PredictionLog()
- prediction_log.prediction_id = prediction_id
+ prediction_log.prediction_id = session_id
prediction_log.model_name = model_id
prediction_log.model_version = model_version
prediction_log.input.features_table.update(
@@ -62,11 +62,13 @@ def test_log_to_dataframe():
},
model_prediction_output=BinaryClassificationOutput(
prediction_score_column="prediction_score",
- actual_label_column="actual_label",
+ actual_score_column="actual_score",
positive_class_label="fraud",
negative_class_label="non fraud",
score_threshold=0.5,
),
+ session_id_column="order_id",
+ row_id_column="driver_id"
)
input_columns = [
"acceptance_rate",
@@ -77,7 +79,7 @@ def test_log_to_dataframe():
request_timestamp = datetime(2021, 1, 1, 0, 0, 0)
prediction_logs = [
new_prediction_log(
- prediction_id="1234",
+ session_id="1234",
model_id=model_id,
model_version=model_version,
input_columns=input_columns,
@@ -94,7 +96,7 @@ def test_log_to_dataframe():
row_ids=["a", "b"],
),
new_prediction_log(
- prediction_id="5678",
+ session_id="5678",
model_id=model_id,
model_version=model_version,
input_columns=input_columns,
@@ -116,23 +118,25 @@ def test_log_to_dataframe():
)
expected_df = pd.DataFrame.from_records(
[
- [0.8, 24, "FOOD", 0.9, "1234", "a", request_timestamp, model_version],
- [0.5, 2, "RIDE", 0.5, "1234", "b", request_timestamp, model_version],
- [1.0, 13, "CAR", 0.4, "5678", "c", request_timestamp, model_version],
- [0.4, 60, "RIDE", 0.2, "5678", "d", request_timestamp, model_version],
+ [0.8, 24, "FOOD", 0.9, "fraud", "1234", "a", "1234_a", request_timestamp, model_version],
+ [0.5, 2, "RIDE", 0.5, "fraud", "1234", "b", "1234_b", request_timestamp, model_version],
+ [1.0, 13, "CAR", 0.4, "non fraud", "5678", "c", "5678_c", request_timestamp, model_version],
+ [0.4, 60, "RIDE", 0.2, "non fraud", "5678", "d", "5678_d", request_timestamp, model_version],
],
columns=[
"acceptance_rate",
"minutes_since_last_order",
"service_type",
"prediction_score",
- "session_id",
- "row_id",
+ "_prediction_label",
+ "order_id",
+ "driver_id",
+ "prediction_id",
"request_timestamp",
"model_version",
],
)
- assert_frame_equal(prediction_logs_df, expected_df)
+ assert_frame_equal(prediction_logs_df, expected_df, check_like=True)
def test_empty_column_conversion_to_dataframe():
@@ -144,7 +148,7 @@ def test_empty_column_conversion_to_dataframe():
},
model_prediction_output=BinaryClassificationOutput(
prediction_score_column="prediction_score",
- actual_label_column="actual_label",
+ actual_score_column="actual_score",
positive_class_label="fraud",
negative_class_label="non fraud",
score_threshold=0.5,
@@ -152,7 +156,7 @@ def test_empty_column_conversion_to_dataframe():
)
prediction_logs = [
new_prediction_log(
- prediction_id="1234",
+ session_id="1234",
model_id=model_id,
model_version=model_version,
input_columns=["acceptance_rate"],
@@ -175,8 +179,10 @@ def test_empty_column_conversion_to_dataframe():
[
np.NaN,
0.5,
+ "fraud",
"1234",
"a",
+ "1234_a",
datetime(2021, 1, 1, 0, 0, 0),
"0.1.0",
],
@@ -184,10 +190,12 @@ def test_empty_column_conversion_to_dataframe():
columns=[
"acceptance_rate",
"prediction_score",
+ "_prediction_label",
"session_id",
"row_id",
+ "prediction_id",
"request_timestamp",
"model_version",
],
)
- assert_frame_equal(prediction_logs_df, expected_df)
+ assert_frame_equal(prediction_logs_df, expected_df, check_like=True)
diff --git a/python/sdk/client/models/binary_classification_output.py b/python/sdk/client/models/binary_classification_output.py
index b99ebf437..253c9df07 100644
--- a/python/sdk/client/models/binary_classification_output.py
+++ b/python/sdk/client/models/binary_classification_output.py
@@ -31,12 +31,12 @@ class BinaryClassificationOutput(BaseModel):
BinaryClassificationOutput
""" # noqa: E501
prediction_score_column: StrictStr
- actual_label_column: Optional[StrictStr] = None
+ actual_score_column: Optional[StrictStr] = None
positive_class_label: StrictStr
negative_class_label: StrictStr
score_threshold: Optional[Union[StrictFloat, StrictInt]] = None
output_class: ModelPredictionOutputClass
- __properties: ClassVar[List[str]] = ["prediction_score_column", "actual_label_column", "positive_class_label", "negative_class_label", "score_threshold", "output_class"]
+ __properties: ClassVar[List[str]] = ["prediction_score_column", "actual_score_column", "positive_class_label", "negative_class_label", "score_threshold", "output_class"]
model_config = {
"populate_by_name": True,
@@ -87,7 +87,7 @@ def from_dict(cls, obj: Dict) -> Self:
_obj = cls.model_validate({
"prediction_score_column": obj.get("prediction_score_column"),
- "actual_label_column": obj.get("actual_label_column"),
+ "actual_score_column": obj.get("actual_score_column"),
"positive_class_label": obj.get("positive_class_label"),
"negative_class_label": obj.get("negative_class_label"),
"score_threshold": obj.get("score_threshold"),
diff --git a/python/sdk/client/models/ranking_output.py b/python/sdk/client/models/ranking_output.py
index 31f673a75..4e9cf30a4 100644
--- a/python/sdk/client/models/ranking_output.py
+++ b/python/sdk/client/models/ranking_output.py
@@ -31,10 +31,9 @@ class RankingOutput(BaseModel):
RankingOutput
""" # noqa: E501
rank_score_column: StrictStr
- prediction_group_id_column: StrictStr
relevance_score_column: Optional[StrictStr] = None
output_class: ModelPredictionOutputClass
- __properties: ClassVar[List[str]] = ["rank_score_column", "prediction_group_id_column", "relevance_score_column", "output_class"]
+ __properties: ClassVar[List[str]] = ["rank_score_column", "relevance_score_column", "output_class"]
model_config = {
"populate_by_name": True,
@@ -85,7 +84,6 @@ def from_dict(cls, obj: Dict) -> Self:
_obj = cls.model_validate({
"rank_score_column": obj.get("rank_score_column"),
- "prediction_group_id_column": obj.get("prediction_group_id_column"),
"relevance_score_column": obj.get("relevance_score_column"),
"output_class": obj.get("output_class")
})
diff --git a/python/sdk/client/models/schema_spec.py b/python/sdk/client/models/schema_spec.py
index 26e02f4e2..a50dff7b3 100644
--- a/python/sdk/client/models/schema_spec.py
+++ b/python/sdk/client/models/schema_spec.py
@@ -31,11 +31,13 @@ class SchemaSpec(BaseModel):
"""
SchemaSpec
""" # noqa: E501
- prediction_id_column: StrictStr
+ prediction_id_column: Optional[StrictStr] = None
+ session_id_column: Optional[StrictStr] = None
+ row_id_column: Optional[StrictStr] = None
model_prediction_output: ModelPredictionOutput
tag_columns: Optional[List[StrictStr]] = None
feature_types: Dict[str, ValueType]
- __properties: ClassVar[List[str]] = ["prediction_id_column", "model_prediction_output", "tag_columns", "feature_types"]
+ __properties: ClassVar[List[str]] = ["prediction_id_column", "session_id_column", "row_id_column", "model_prediction_output", "tag_columns", "feature_types"]
model_config = {
"populate_by_name": True,
@@ -89,6 +91,8 @@ def from_dict(cls, obj: Dict) -> Self:
_obj = cls.model_validate({
"prediction_id_column": obj.get("prediction_id_column"),
+ "session_id_column": obj.get("session_id_column"),
+ "row_id_column": obj.get("row_id_column"),
"model_prediction_output": ModelPredictionOutput.from_dict(obj.get("model_prediction_output")) if obj.get("model_prediction_output") is not None else None,
"tag_columns": obj.get("tag_columns"),
"feature_types": dict((_k, _v) for _k, _v in obj.get("feature_types").items())
diff --git a/python/sdk/client_README.md b/python/sdk/client_README.md
new file mode 100644
index 000000000..069653994
--- /dev/null
+++ b/python/sdk/client_README.md
@@ -0,0 +1,207 @@
+# merlin-sdk
+API Guide for accessing Merlin's model management, deployment, and serving functionalities
+
+The `client` package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:
+
+- API version: 0.14.0
+- Package version: 1.0.0
+- Build package: org.openapitools.codegen.languages.PythonClientCodegen
+
+## Requirements.
+
+Python 3.7+
+
+## Installation & Usage
+
+This python library package is generated without supporting files like setup.py or requirements files
+
+To be able to use it, you will need these dependencies in your own package that uses this library:
+
+* urllib3 >= 1.25.3
+* python-dateutil
+* pydantic
+
+## Getting Started
+
+In your own code, to use this library to connect and interact with merlin-sdk,
+you can run the following:
+
+```python
+
+import time
+import client
+from client.rest import ApiException
+from pprint import pprint
+
+# Defining the host is optional and defaults to http://localhost:8080/v1
+# See configuration.py for a list of all supported configuration parameters.
+configuration = client.Configuration(
+ host = "http://localhost:8080/v1"
+)
+
+# The client must configure the authentication and authorization parameters
+# in accordance with the API server security policy.
+# Examples for each auth method are provided below, use the example that
+# satisfies your auth use case.
+
+# Configure API key authorization: Bearer
+configuration.api_key['Bearer'] = os.environ["API_KEY"]
+
+# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
+# configuration.api_key_prefix['Bearer'] = 'Bearer'
+
+
+# Enter a context with an instance of the API client
+with client.ApiClient(configuration) as api_client:
+ # Create an instance of the API class
+ api_instance = client.AlertApi(api_client)
+
+ try:
+ # Lists teams for alert notification channel.
+ api_response = api_instance.alerts_teams_get()
+ print("The response of AlertApi->alerts_teams_get:\n")
+ pprint(api_response)
+ except ApiException as e:
+ print("Exception when calling AlertApi->alerts_teams_get: %s\n" % e)
+
+```
+
+## Documentation for API Endpoints
+
+All URIs are relative to *http://localhost:8080/v1*
+
+Class | Method | HTTP request | Description
+------------ | ------------- | ------------- | -------------
+*AlertApi* | [**alerts_teams_get**](client/docs/AlertApi.md#alerts_teams_get) | **GET** /alerts/teams | Lists teams for alert notification channel.
+*AlertApi* | [**models_model_id_alerts_get**](client/docs/AlertApi.md#models_model_id_alerts_get) | **GET** /models/{model_id}/alerts | Lists alerts for given model.
+*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_get**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Gets alert for given model endpoint.
+*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_post**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_post) | **POST** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint.
+*AlertApi* | [**models_model_id_endpoints_model_endpoint_id_alert_put**](client/docs/AlertApi.md#models_model_id_endpoints_model_endpoint_id_alert_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint.
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_containers_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_containers_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id}/containers | Get all container belong to a version endpoint
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_delete**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_delete) | **DELETE** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Undeploy the specified model version deployment
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Get version endpoint resource
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_endpoint_id_put**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_endpoint_id_put) | **PUT** /models/{model_id}/versions/{version_id}/endpoint/{endpoint_id} | Modify version endpoint, this API will redeploy the associated deployment
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_get**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_get) | **GET** /models/{model_id}/versions/{version_id}/endpoint | List all endpoint of a model version
+*EndpointApi* | [**models_model_id_versions_version_id_endpoint_post**](client/docs/EndpointApi.md#models_model_id_versions_version_id_endpoint_post) | **POST** /models/{model_id}/versions/{version_id}/endpoint | Deploy specific version of the models
+*EnvironmentApi* | [**environments_get**](client/docs/EnvironmentApi.md#environments_get) | **GET** /environments | List available environment
+*LogApi* | [**logs_get**](client/docs/LogApi.md#logs_get) | **GET** /logs | Retrieve log from a container
+*ModelEndpointsApi* | [**models_model_id_endpoints_get**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_get) | **GET** /models/{model_id}/endpoints | List model endpoint
+*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_delete**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_delete) | **DELETE** /models/{model_id}/endpoints/{model_endpoint_id} | Stop serving traffic to the model endpoint, then delete it.
+*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_get**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id} | Get a model endpoint
+*ModelEndpointsApi* | [**models_model_id_endpoints_model_endpoint_id_put**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_model_endpoint_id_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id} | Update model endpoint data. Mainly used to update its rule.
+*ModelEndpointsApi* | [**models_model_id_endpoints_post**](client/docs/ModelEndpointsApi.md#models_model_id_endpoints_post) | **POST** /models/{model_id}/endpoints | Create a model endpoint
+*ModelEndpointsApi* | [**projects_project_id_model_endpoints_get**](client/docs/ModelEndpointsApi.md#projects_project_id_model_endpoints_get) | **GET** /projects/{project_id}/model_endpoints | List existing model endpoints for all models in particular project
+*ModelSchemaApi* | [**models_model_id_schemas_get**](client/docs/ModelSchemaApi.md#models_model_id_schemas_get) | **GET** /models/{model_id}/schemas | List all of the model schemas
+*ModelSchemaApi* | [**models_model_id_schemas_put**](client/docs/ModelSchemaApi.md#models_model_id_schemas_put) | **PUT** /models/{model_id}/schemas | Creating new schemas for a model
+*ModelSchemaApi* | [**models_model_id_schemas_schema_id_delete**](client/docs/ModelSchemaApi.md#models_model_id_schemas_schema_id_delete) | **DELETE** /models/{model_id}/schemas/{schema_id} | Delete schema
+*ModelSchemaApi* | [**models_model_id_schemas_schema_id_get**](client/docs/ModelSchemaApi.md#models_model_id_schemas_schema_id_get) | **GET** /models/{model_id}/schemas/{schema_id} | Get detail of the schema
+*ModelsApi* | [**alerts_teams_get**](client/docs/ModelsApi.md#alerts_teams_get) | **GET** /alerts/teams | Lists teams for alert notification channel.
+*ModelsApi* | [**models_model_id_alerts_get**](client/docs/ModelsApi.md#models_model_id_alerts_get) | **GET** /models/{model_id}/alerts | Lists alerts for given model.
+*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_get**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_get) | **GET** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Gets alert for given model endpoint.
+*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_post**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_post) | **POST** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint.
+*ModelsApi* | [**models_model_id_endpoints_model_endpoint_id_alert_put**](client/docs/ModelsApi.md#models_model_id_endpoints_model_endpoint_id_alert_put) | **PUT** /models/{model_id}/endpoints/{model_endpoint_id}/alert | Creates alert for given model endpoint.
+*ModelsApi* | [**projects_project_id_models_get**](client/docs/ModelsApi.md#projects_project_id_models_get) | **GET** /projects/{project_id}/models | List existing models
+*ModelsApi* | [**projects_project_id_models_model_id_delete**](client/docs/ModelsApi.md#projects_project_id_models_model_id_delete) | **DELETE** /projects/{project_id}/models/{model_id} | Delete model
+*ModelsApi* | [**projects_project_id_models_model_id_get**](client/docs/ModelsApi.md#projects_project_id_models_model_id_get) | **GET** /projects/{project_id}/models/{model_id} | Get model
+*ModelsApi* | [**projects_project_id_models_post**](client/docs/ModelsApi.md#projects_project_id_models_post) | **POST** /projects/{project_id}/models | Register a new models
+*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_get) | **GET** /models/{model_id}/versions/{version_id}/jobs | List all prediction jobs of a model version
+*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_containers_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_containers_get) | **GET** /models/{model_id}/versions/{version_id}/jobs/{job_id}/containers | Get all container belong to a prediction job
+*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_get**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_get) | **GET** /models/{model_id}/versions/{version_id}/jobs/{job_id} | Get prediction jobs with given id
+*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_job_id_stop_put**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_job_id_stop_put) | **PUT** /models/{model_id}/versions/{version_id}/jobs/{job_id}/stop | Stop prediction jobs with given id
+*PredictionJobsApi* | [**models_model_id_versions_version_id_jobs_post**](client/docs/PredictionJobsApi.md#models_model_id_versions_version_id_jobs_post) | **POST** /models/{model_id}/versions/{version_id}/jobs | Create a prediction job from the given model version
+*PredictionJobsApi* | [**projects_project_id_jobs_get**](client/docs/PredictionJobsApi.md#projects_project_id_jobs_get) | **GET** /projects/{project_id}/jobs | List all prediction jobs created using the model
+*ProjectApi* | [**projects_get**](client/docs/ProjectApi.md#projects_get) | **GET** /projects | List existing projects
+*ProjectApi* | [**projects_post**](client/docs/ProjectApi.md#projects_post) | **POST** /projects | Create new project
+*ProjectApi* | [**projects_project_id_get**](client/docs/ProjectApi.md#projects_project_id_get) | **GET** /projects/{project_id} | Get project
+*ProjectApi* | [**projects_project_id_put**](client/docs/ProjectApi.md#projects_project_id_put) | **PUT** /projects/{project_id} | Update project
+*SecretApi* | [**projects_project_id_secrets_get**](client/docs/SecretApi.md#projects_project_id_secrets_get) | **GET** /projects/{project_id}/secrets | List secret
+*SecretApi* | [**projects_project_id_secrets_post**](client/docs/SecretApi.md#projects_project_id_secrets_post) | **POST** /projects/{project_id}/secrets | Create secret
+*SecretApi* | [**projects_project_id_secrets_secret_id_delete**](client/docs/SecretApi.md#projects_project_id_secrets_secret_id_delete) | **DELETE** /projects/{project_id}/secrets/{secret_id} | Delete secret
+*SecretApi* | [**projects_project_id_secrets_secret_id_patch**](client/docs/SecretApi.md#projects_project_id_secrets_secret_id_patch) | **PATCH** /projects/{project_id}/secrets/{secret_id} | Update secret
+*StandardTransformerApi* | [**standard_transformer_simulate_post**](client/docs/StandardTransformerApi.md#standard_transformer_simulate_post) | **POST** /standard_transformer/simulate | Simulate standard transformer
+*VersionApi* | [**models_model_id_versions_get**](client/docs/VersionApi.md#models_model_id_versions_get) | **GET** /models/{model_id}/versions | List versions of the models
+*VersionApi* | [**models_model_id_versions_post**](client/docs/VersionApi.md#models_model_id_versions_post) | **POST** /models/{model_id}/versions | Log a new version of the models
+*VersionApi* | [**models_model_id_versions_version_id_delete**](client/docs/VersionApi.md#models_model_id_versions_version_id_delete) | **DELETE** /models/{model_id}/versions/{version_id} | Delete version by ID from model
+*VersionApi* | [**models_model_id_versions_version_id_get**](client/docs/VersionApi.md#models_model_id_versions_version_id_get) | **GET** /models/{model_id}/versions/{version_id} | Get version by ID from model
+*VersionApi* | [**models_model_id_versions_version_id_patch**](client/docs/VersionApi.md#models_model_id_versions_version_id_patch) | **PATCH** /models/{model_id}/versions/{version_id} | Patch the version
+
+
+## Documentation For Models
+
+ - [AlertConditionMetricType](client/docs/AlertConditionMetricType.md)
+ - [AlertConditionSeverity](client/docs/AlertConditionSeverity.md)
+ - [AutoscalingPolicy](client/docs/AutoscalingPolicy.md)
+ - [BinaryClassificationOutput](client/docs/BinaryClassificationOutput.md)
+ - [Config](client/docs/Config.md)
+ - [Container](client/docs/Container.md)
+ - [CustomPredictor](client/docs/CustomPredictor.md)
+ - [DeploymentMode](client/docs/DeploymentMode.md)
+ - [EndpointStatus](client/docs/EndpointStatus.md)
+ - [EnvVar](client/docs/EnvVar.md)
+ - [Environment](client/docs/Environment.md)
+ - [FileFormat](client/docs/FileFormat.md)
+ - [GPUConfig](client/docs/GPUConfig.md)
+ - [GPUToleration](client/docs/GPUToleration.md)
+ - [Label](client/docs/Label.md)
+ - [Logger](client/docs/Logger.md)
+ - [LoggerConfig](client/docs/LoggerConfig.md)
+ - [LoggerMode](client/docs/LoggerMode.md)
+ - [MetricsType](client/docs/MetricsType.md)
+ - [MockResponse](client/docs/MockResponse.md)
+ - [Model](client/docs/Model.md)
+ - [ModelEndpoint](client/docs/ModelEndpoint.md)
+ - [ModelEndpointAlert](client/docs/ModelEndpointAlert.md)
+ - [ModelEndpointAlertCondition](client/docs/ModelEndpointAlertCondition.md)
+ - [ModelEndpointRule](client/docs/ModelEndpointRule.md)
+ - [ModelEndpointRuleDestination](client/docs/ModelEndpointRuleDestination.md)
+ - [ModelPredictionConfig](client/docs/ModelPredictionConfig.md)
+ - [ModelPredictionOutput](client/docs/ModelPredictionOutput.md)
+ - [ModelPredictionOutputClass](client/docs/ModelPredictionOutputClass.md)
+ - [ModelSchema](client/docs/ModelSchema.md)
+ - [OperationTracing](client/docs/OperationTracing.md)
+ - [PipelineTracing](client/docs/PipelineTracing.md)
+ - [PredictionJob](client/docs/PredictionJob.md)
+ - [PredictionJobConfig](client/docs/PredictionJobConfig.md)
+ - [PredictionJobConfigBigquerySink](client/docs/PredictionJobConfigBigquerySink.md)
+ - [PredictionJobConfigBigquerySource](client/docs/PredictionJobConfigBigquerySource.md)
+ - [PredictionJobConfigGcsSink](client/docs/PredictionJobConfigGcsSink.md)
+ - [PredictionJobConfigGcsSource](client/docs/PredictionJobConfigGcsSource.md)
+ - [PredictionJobConfigModel](client/docs/PredictionJobConfigModel.md)
+ - [PredictionJobConfigModelResult](client/docs/PredictionJobConfigModelResult.md)
+ - [PredictionJobResourceRequest](client/docs/PredictionJobResourceRequest.md)
+ - [PredictionLoggerConfig](client/docs/PredictionLoggerConfig.md)
+ - [Project](client/docs/Project.md)
+ - [Protocol](client/docs/Protocol.md)
+ - [RankingOutput](client/docs/RankingOutput.md)
+ - [RegressionOutput](client/docs/RegressionOutput.md)
+ - [ResourceRequest](client/docs/ResourceRequest.md)
+ - [ResultType](client/docs/ResultType.md)
+ - [SaveMode](client/docs/SaveMode.md)
+ - [SchemaSpec](client/docs/SchemaSpec.md)
+ - [Secret](client/docs/Secret.md)
+ - [StandardTransformerSimulationRequest](client/docs/StandardTransformerSimulationRequest.md)
+ - [StandardTransformerSimulationResponse](client/docs/StandardTransformerSimulationResponse.md)
+ - [Transformer](client/docs/Transformer.md)
+ - [ValueType](client/docs/ValueType.md)
+ - [Version](client/docs/Version.md)
+ - [VersionEndpoint](client/docs/VersionEndpoint.md)
+
+
+
+## Documentation For Authorization
+
+
+Authentication schemes defined for the API:
+
+### Bearer
+
+- **Type**: API key
+- **API key parameter name**: Authorization
+- **Location**: HTTP header
+
+
+## Author
+
+
+
+
diff --git a/python/sdk/merlin/observability/inference.py b/python/sdk/merlin/observability/inference.py
index 0a067c542..dc6287069 100644
--- a/python/sdk/merlin/observability/inference.py
+++ b/python/sdk/merlin/observability/inference.py
@@ -80,13 +80,15 @@ def decode(cls, input: Dict):
be made to the input dataframe.
:param df: Input dataframe.
+ :param session_id_column: Name of the column containing the session id.
+ :param row_id_column: Name of the column containing the row id.
:param observation_types: Types of observations to be included in the output dataframe.
:return: output dataframe
"""
@abc.abstractmethod
def preprocess(
- self, df: pd.DataFrame, observation_types: List[ObservationType]
+ self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType]
) -> pd.DataFrame:
raise NotImplementedError
@@ -120,7 +122,7 @@ class RegressionOutput(PredictionOutput):
actual_score_column: str
def preprocess(
- self, df: pd.DataFrame, observation_types: List[ObservationType]
+ self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType]
) -> pd.DataFrame:
return df
@@ -144,14 +146,14 @@ class BinaryClassificationOutput(PredictionOutput):
Attributes:
prediction_score_column: Name of the column containing the prediction score.
Prediction score must be between 0.0 and 1.0.
- actual_label_column: Name of the column containing the actual class.
+ actual_score_column: Name of the column containing the actual score.
positive_class_label: Label for positive class.
negative_class_label: Label for negative class.
score_threshold: Score threshold for prediction to be considered as positive class.
"""
prediction_score_column: str
- actual_label_column: str
+ actual_score_column: str
positive_class_label: str
negative_class_label: str
score_threshold: float = 0.5
@@ -161,8 +163,8 @@ def prediction_label_column(self) -> str:
return "_prediction_label"
@property
- def actual_score_column(self) -> str:
- return "_actual_score"
+ def actual_label_column(self) -> str:
+ return "_actual_label"
def prediction_label(self, prediction_score: float) -> str:
"""
@@ -176,28 +178,28 @@ def prediction_label(self, prediction_score: float) -> str:
else self.negative_class_label
)
- def actual_score(self, actual_label: str) -> float:
+ def actual_label(self, actual_score: int) -> str:
"""
- Derive actual score from actual label.
- :param actual_label: Actual label.
- :return: actual score. Either 0.0 for negative class or 1.0 for positive class.
+ Derive actual label from actual score.
+ :param actual_score: Actual score.
+ :return: actual label
"""
- if actual_label not in [self.positive_class_label, self.negative_class_label]:
- raise ValueError(
- f"Actual label must be one of the classes, got {actual_label}"
- )
- return 1.0 if actual_label == self.positive_class_label else 0.0
+ return (
+ self.positive_class_label
+ if actual_score >= self.score_threshold
+ else self.negative_class_label
+ )
def preprocess(
- self, df: pd.DataFrame, observation_types: List[ObservationType]
+ self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType]
) -> pd.DataFrame:
if ObservationType.PREDICTION in observation_types:
df[self.prediction_label_column] = df[self.prediction_score_column].apply(
self.prediction_label
)
if ObservationType.GROUND_TRUTH in observation_types:
- df[self.actual_score_column] = df[self.actual_label_column].apply(
- self.actual_score
+ df[self.actual_label_column] = df[self.actual_score_column].apply(
+ self.actual_label
)
return df
@@ -218,13 +220,11 @@ def ground_truth_types(self) -> Dict[str, ValueType]:
@dataclass
class RankingOutput(PredictionOutput):
rank_score_column: str
- prediction_group_id_column: str
relevance_score_column: str
"""
Ranking model prediction output schema.
Attributes:
rank_score_column: Name of the column containing the ranking score of the prediction.
- prediction_group_id_column: Name of the column containing the prediction group id.
relevance_score_column: Name of the column containing the relevance score of the prediction.
"""
@@ -233,11 +233,11 @@ def rank_column(self) -> str:
return "_rank"
def preprocess(
- self, df: pd.DataFrame, observation_types: List[ObservationType]
+ self, df: pd.DataFrame, session_id_column: str, row_id_column: str, observation_types: List[ObservationType]
) -> pd.DataFrame:
if ObservationType.PREDICTION in observation_types:
df[self.rank_column] = (
- df.groupby(self.prediction_group_id_column)[self.rank_score_column]
+ df.groupby(session_id_column)[self.rank_score_column]
.rank(method="first", ascending=False)
.astype(np.int_)
)
@@ -245,8 +245,8 @@ def preprocess(
def prediction_types(self) -> Dict[str, ValueType]:
return {
+ self.rank_score_column: ValueType.FLOAT64,
self.rank_column: ValueType.INT64,
- self.prediction_group_id_column: ValueType.STRING,
}
def ground_truth_types(self) -> Dict[str, ValueType]:
@@ -265,6 +265,8 @@ class InferenceSchema:
decoder=PredictionOutput.decode,
)
)
+ session_id_column: str = "session_id"
+ row_id_column: str = "row_id"
prediction_id_column: str = "prediction_id"
tag_columns: Optional[List[str]] = None
@@ -279,3 +281,12 @@ def prediction_columns(self) -> List[str]:
@property
def ground_truth_columns(self) -> List[str]:
return list(self.model_prediction_output.ground_truth_types().keys())
+
+ def preprocess(
+ self, df: pd.DataFrame, observation_types: List[ObservationType]
+ ) -> pd.DataFrame:
+ if observation_types != ObservationType.FEATURE:
+ df[self.prediction_id_column] = df[self.session_id_column] + "_" + df[self.row_id_column]
+ return self.model_prediction_output.preprocess(
+ df, self.session_id_column, self.row_id_column, observation_types
+ )
diff --git a/python/sdk/test/observability_test.py b/python/sdk/test/observability_test.py
index a670a854e..1241a705c 100644
--- a/python/sdk/test/observability_test.py
+++ b/python/sdk/test/observability_test.py
@@ -8,7 +8,7 @@
def binary_classification_output() -> BinaryClassificationOutput:
return BinaryClassificationOutput(
prediction_score_column="prediction_score",
- actual_label_column="target",
+ actual_score_column="actual_score",
positive_class_label="ACCEPTED",
negative_class_label="REJECTED",
score_threshold=0.5,
@@ -21,7 +21,7 @@ def test_prediction_output_encoding(binary_classification_output: BinaryClassifi
assert encoded_output == {
"output_class": "BinaryClassificationOutput",
"prediction_score_column": "prediction_score",
- "actual_label_column": "target",
+ "actual_score_column": "actual_score",
"positive_class_label": "ACCEPTED",
"negative_class_label": "REJECTED",
"score_threshold": 0.5,
@@ -35,31 +35,35 @@ def test_prediction_output_encoding(binary_classification_output: BinaryClassifi
def test_binary_classification_preprocessing(binary_classification_output: BinaryClassificationOutput):
input_df = pd.DataFrame.from_records(
[
- [0.8, "ACCEPTED"],
- [0.5, "ACCEPTED"],
- [1.0, "REJECTED"],
- [0.4, "ACCEPTED"]
+ ["9001", "1001", 0.8, 1.0],
+ ["9002", "1001", 0.5, 1.0],
+ ["9002", "1002", 1.0, 0.0],
+ ["9003", "1003", 0.4, 1.0]
],
columns=[
+ "order_id",
+ "driver_id",
"prediction_score",
- "target"
+ "actual_score"
],
)
- processed_df = binary_classification_output.preprocess(input_df, [ObservationType.PREDICTION, ObservationType.GROUND_TRUTH])
+ processed_df = binary_classification_output.preprocess(input_df, "order_id", "driver_id", [ObservationType.PREDICTION, ObservationType.GROUND_TRUTH])
pd.testing.assert_frame_equal(
processed_df,
pd.DataFrame.from_records(
[
- [0.8, "ACCEPTED" , "ACCEPTED", 1.0],
- [0.5, "ACCEPTED", "ACCEPTED", 1.0],
- [1.0, "REJECTED", "ACCEPTED", 0.0],
- [0.4, "ACCEPTED", "REJECTED", 1.0]
+ ["9001", "1001", 0.8, 1.0, "ACCEPTED" , "ACCEPTED"],
+ ["9002", "1001", 0.5, 1.0, "ACCEPTED", "ACCEPTED"],
+ ["9002", "1002", 1.0, 0.0, "ACCEPTED", "REJECTED"],
+ ["9003", "1003", 0.4, 1.0, "REJECTED", "ACCEPTED"]
],
columns=[
+ "order_id",
+ "driver_id",
"prediction_score",
- "target",
+ "actual_score",
"_prediction_label",
- "_actual_score",
+ "_actual_label",
],
),
)
@@ -69,7 +73,6 @@ def test_binary_classification_preprocessing(binary_classification_output: Binar
def ranking_prediction_output() -> RankingOutput:
return RankingOutput(
rank_score_column="score",
- prediction_group_id_column="order_id",
relevance_score_column="relevance_score_column",
)
@@ -78,33 +81,35 @@ def ranking_prediction_output() -> RankingOutput:
def test_ranking_preprocessing(ranking_prediction_output: RankingOutput):
input_df = pd.DataFrame.from_records(
[
- [1.0, "1001"],
- [0.8, "1001"],
- [0.1, "1001"],
- [0.3, "1002"],
- [0.2, "1002"],
- [0.1, "1002"],
+ [1.0, "1001", "d1"],
+ [0.8, "1001", "d2"],
+ [0.1, "1001", "d3"],
+ [0.3, "1002", "d1"],
+ [0.2, "1002", "d2"],
+ [0.1, "1002", "d3"],
],
columns=[
"score",
"order_id",
+ "driver_id"
],
)
- processed_df = ranking_prediction_output.preprocess(input_df, [ObservationType.PREDICTION])
+ processed_df = ranking_prediction_output.preprocess(input_df, "order_id", "driver_id", [ObservationType.PREDICTION])
pd.testing.assert_frame_equal(
processed_df,
pd.DataFrame.from_records(
[
- [1.0, "1001", 1],
- [0.8, "1001", 2],
- [0.1, "1001", 3],
- [0.3, "1002", 1],
- [0.2, "1002", 2],
- [0.1, "1002", 3],
+ [1.0, "1001", "d1", 1],
+ [0.8, "1001", "d2", 2],
+ [0.1, "1001", "d3", 3],
+ [0.3, "1002", "d1", 1],
+ [0.2, "1002", "d2", 2],
+ [0.1, "1002", "d3", 3],
],
columns=[
"score",
"order_id",
+ "driver_id",
"_rank",
],
),
diff --git a/swagger.yaml b/swagger.yaml
index 3bf626b5a..e83dd57f0 100644
--- a/swagger.yaml
+++ b/swagger.yaml
@@ -1380,12 +1380,15 @@ components:
SchemaSpec:
type: object
required:
- - prediction_id_column
- model_prediction_output
- feature_types
properties:
prediction_id_column:
type: string
+ session_id_column:
+ type: string
+ row_id_column:
+ type: string
model_prediction_output:
$ref : '#/components/schemas/ModelPredictionOutput'
tag_columns:
@@ -1427,7 +1430,7 @@ components:
properties:
prediction_score_column:
type: string
- actual_label_column:
+ actual_score_column:
type: string
positive_class_label:
type: string
@@ -1442,13 +1445,10 @@ components:
type: object
required:
- rank_score_column
- - prediction_group_id_column
- output_class
properties:
rank_score_column:
type: string
- prediction_group_id_column:
- type: string
relevance_score_column:
type: string
output_class: