Skip to content

Commit

Permalink
Add documentation for model schema and model observability
Browse files Browse the repository at this point in the history
  • Loading branch information
tiopramayudi committed Feb 1, 2024
1 parent 2183242 commit e16e67c
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 84 deletions.
4 changes: 4 additions & 0 deletions docs/diagrams/model_observability.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 0 additions & 3 deletions docs/user/templates/09_model_observability.md

This file was deleted.

90 changes: 90 additions & 0 deletions docs/user/templates/09_model_schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<!-- page-title: Model Schema -->

# Model Schema

Model schema is a specification of input and output of a model, such as what are the features columns, prediction columns and also ground truth columns. Following are the fields in model schema:

| Field | Type | Description | Mandatory |
|-------|------|-------------|-----------|
| `id` | int | Unique identifier for each model schema | Not mandatory, if ID is not specified it will create new model schema otherwise it will update the model schema with corresponding ID |
| `model_id`| int | Model ID that correlate with the schema | Not mandatory, if not specified the SDK will assign it with the model that user set |
| `spec` | InferenceSchema | Detail specification for model schema | True |

Detail specification is defined by using `InferenceSchema` class, following are the fields:
| Field | Type | Description | Mandatory |
|-------|------|-------------|-----------|
| `feature_types` | Dict[str, ValueType] | Mapping between feature name with the type of the feature | True |
| `model_prediction_output` | PredictionOutput | Prediction specification that differ between model types, e.g BinaryClassificationOutput, RegressionOutput, RankingOutput | True |
| `prediction_id_column` | str | The column name that contains prediction id value | True |
| `tag_columns` | Optional[List[str]] | List of column names that contains additional information about prediction, you can treat it as metadata | False |

From above we can see `model_prediction_output` field that has type `PredictionOutput`, this field is a specification of prediction that is generated by the model depending on it's model type. Currently we support 3 model types in the schema:
* Binary Classification
* Regression
* Ranking

Each model type has it's own model prediction output specification.

## Binary Classification
Model prediction output specification for Binary Classification type is `BinaryClassificationOutput` that has following fields:

| Field | Type | Description | Mandatory |
|-------|------|-------------|-----------|
| `prediction_score_column` | str | Column that contains prediction score value of a model. Prediction score must be between 0.0 and 1.0 | True |
| `actual_label_column` | str | Name of the column containing the actual class | False, because not all model has the ground truth |
| `positive_class_label` | str | Label for positive class | True |
| `negative_class_label` | str | Label for negative class | True |
| `score_threshold` | float | Score threshold for prediction to be considered as positive class | False, if not specified it will use 0.5 as default |

## Regression
Model prediction output specification for Regression type is `RegressionOutput` that has following fields:

| Field | Type | Description | Mandatory |
|-------|------|-------------|-----------|
| `prediction_score_column` | str | Column that contains prediction score value of a model | True |
| `actual_score_column` | str | Name of the column containing the actual score | False, because not all model has the ground truth |


## Ranking
Model prediction output specification for Ranking type is `RankingOutput` that has following fields:

| Field | Type | Description | Mandatory |
|-------|------|-------------|-----------|
| `rank_score_column` | str | Name of the column containing the ranking score of the prediction | True |
| `prediction_group_id_column` | str | Name of the column containing the prediction group id | True |
| `relevance_score_column` | str | Name of the column containing the relevance score of the prediction | True |

## Define model schema
From the specification above, users can create the schema for their model. Suppose that users have binary classification model, that has 4 features
* featureA that has float type
* featureB that has int type
* featureC that has string type
* featureD that has float type

With positive class `complete` and negative class `non_complete` and the threshold for positive class is 0.75. Actual label is stored under column `target`, `prediction_score` under column `score` `prediction_id` under column `prediction_id`. From that specification, users can define the model schema and put it alongside version creation. Below is the example snipped code

```python
from merlin.model_schema import ModelSchema
from merlin.observability.inference import InferenceSchema, ValueType, BinaryClassificationOutput
model_schema = ModelSchema(spec=InferenceSchema(
feature_types={
"featureA": ValueType.FLOAT64,
"featureB": ValueType.INT64,
"featureC": ValueType.STRING,
"featureD": ValueType.BOOLEAN
},
prediction_id_column="prediction_id",
model_prediction_output=BinaryClassificationOutput(
prediction_score_column="score",
actual_label_column="target",
positive_class_label="complete",
negative_class_label="non_complete",
score_threshold=0.75
)
))
with merlin.new_model_version(model_schema=model_schema) as v:
....

```

The above snipped code will define model schema and attach it to certain model version, the reason is the schema for each version is possible to differ.
99 changes: 99 additions & 0 deletions docs/user/templates/10_model_observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<!-- page-title: Model Observability -->
# Model Observability
Model observability enable model's owners to observe and analyze their model in production by looking at the performance and drift metrics. Performance indicate how well your model to do prediction compare to the actual output, and drift indicate the difference of distribution between two datasets. To calculate those metrics the model observability system needs the following data:
* Features data. The features data that is supplied to the model to do prediction
* Prediction data. The prediction as output of your model
* Ground truth / Actual data. The actual value of thing that your model try to predict

Those data can be collected from training phase and serving phase (production). Data that is collected on the training phase is used as the baseline dataset, we can refer it as training dataset. For data during serving phase we can refer it as production dataset, this data must be emitted by the model. By default the merlin model is not emitting any of those data, hence model observability is not enabled by default. However, merlin provides a way so model can emit such data but currently it is limited only for PyFunc model. The way is to turn on the flag of `ENABLE_MODEL_OBSERVABILITY` and modify the PyFunc model to returning model input (features) and model output (prediction output), more detail will be explained in the `Onboarding` section.

## Architecture

![architecture](../../diagrams/model_observability.drawio.svg)

From above architecture diagram, we can see that there are three places where the data is published to model observability system
* Model deployment workflow. Especially after model training step is completed. This step is publishing training dataset as baseline dataset
* Model serving. PyFunc model will emit features and predictions data to a topic in a kafka cluster, and separate kafka consumer consume corresponding topic and publish the data to model observability system. Kafka consumer also store the data into separate BQ table that later will be used to be joined with user ground truth BQ table.
* Ground truth collector workflow. This workflow primary objective is to publish ground truth or actual for a prediction

## Onboarding
As the architecture diagram illustrate, the end to end model onboarding to model observability needs to involving several components. The scope of this section is limited to merlin model modification. {{ workflow_scope_explaination }}

### ### PyFunc modification
Currently the only supported model for model observability is PyFunc model, the model should implements class `PyFuncV3Model` instead of `PyFuncModel`. This `PyFuncV3Model` has difference method signature that must be implemented. Following are the new methods:
| Method Name | Description |
|-------------|-------------|
| `preprocess(self, request: dict, **kwargs) -> ModelInput` | Doing preprocessing that returning all the required features for prediction. Must be implemented if using `HTTP_JSON` protocol |
| `postprocess(self, model_output: ModelOutput, request: dict) -> dict` | Postprocessing basically do additional processing to construct end result of the overall model. Must be implemented if using `HTTP_JSON` protocol |
| `upiv1_preprocess(self, request: upi_pb2.PredictValuesRequest, context: grpc.ServicerContext) -> ModelInput` | Preprocess method signature that only called when using `UPI_V1` protocol. Must be implemented if using `UPI_V1` protocol |
| `upiv1_postprocess(self, model_output: ModelOutput, request: upi_pb2.PredictValuesRequest) -> upi_pb2.PredictValuesResponse` | Postprocess method signature that only callend when using `UPI_V1` protocol. Must be implemented if using `UPI_V1` protocol |

Beside changes in signature, you can see some of those methods returning new type, `ModelInput` and `ModelOutput`. `ModelInput` is a class that represents input information of the models, this class contains following fields:
| Field | Type | Description|
|-------|------|------------|
| `prediction_ids` | List[str] | Unique identifier for each prediction |
| `features` | Union[Values, pandas.DataFrame] | Features value that is used by the model to generate prediction. Length of features should be the same with `prediction_ids` |
| `entities` | Optional[Union[Values, pandas.DataFrame]] | Additional data that are not used for prediction, but this data is used to retrieved another features, e.g `driver_id`, we can retrieve features associated with certain `driver_id`|
| `session_id` | str | Identifier for the request. This value will be used together with `prediction_ids` as prediction identifier in model observability system |

`ModelInput` data is essential for model observability since it contains features values and identifier of prediction. Features values are used to calculate feature drift, and identifier is used as join key between features, prediction data with ground truth data. On the other hand, `ModelOutput` is the class that represent raw model prediction output, not the final output of PyFunc model. `ModelOutput` class contains following fields:
| Field | Type | Description |
|-------|------|-------------|
| `prediction` | Values | `predictions` contains prediction output from ml_predict, it may contains multiple columns e.g for multiclass classification or for binary classification that contains prediction score and label |
| `prediction_ids` | List[str] | Unique identifier for each prediction output |

Same like `ModelInput`, `ModelOutput` is also essential for model observability, it can be used to calculate prediction drift but more importantly it can calculate performance metrics.

### Configure Model Schema

Model schema is essential for model observability because it is used by the kafka consumer to choose which columns that is relevant to model observability and do necessary preprocessing before publishing the data to model observability system. Users can see more detail of configuring model schema [here](../templates/09_model_schema.md)

### Deployment
There is not much change on the deployment part, users just needs to set `enable_model_observability` parameter to `True` during model deploy. For clarity, we take one use case for model observability example, suppose a model has 4 features:
* featureA that has float type
* featureB that has int type
* featureC that has string type
* featureD that has float type

The model type is ranking with prediction group id information is located in `session_id` column, prediction id in `prediction_id` column, rank score in `score` column and `relevance_score_column` in `relevance_score`. Below is the snipped of the python code

```python
class ModelObservabilityModel(PyFuncV3Model):

def preprocess(self, request: dict, **kwargs) -> ModelInput:
return ModelInput(
session_id="session_id",
prediction_ids=["prediction_1", "prediction_2"],
features=pd.DataFrame([[0.7, 200, "ID", True], [0.99, 250, "SG", False]], columns=["featureA", "featureB", "featureC", "featureD"]),
)

def infer(self, model_input: ModelInput) -> ModelOutput:
return ModelOutput(
prediction_ids=model_input.prediction_ids,
predictions=Values(columns=["score"], data=[[0.5], [0.9]]),
)
def postprocess(self, model_output: ModelOutput, request: dict) -> dict:
return {"predictions": model_output.predictions.data}


model_schema = ModelSchema(spec=InferenceSchema(
feature_types={
"featureA": ValueType.FLOAT64,
"featureB": ValueType.INT64,
"featureC": ValueType.STRING,
"featureD": ValueType.BOOLEAN
},
prediction_id_column="prediction_id",
model_prediction_output=RankingOutput(
rank_score_column="score",
prediction_group_id_column="session_id",
relevance_score_column="relevance_score"
)
))
with merlin.new_model_version(model_schema=model_schema) as v:
v.log_pyfunc_model(model_instance=ModelObservabilityModel(),
conda_env="env.yaml",
code_dir=["src"],
artifacts={"model": ARTIFACT_PATH})
endpoint = merlin.deploy(v, enable_model_observability=True)
```
5 changes: 4 additions & 1 deletion python/sdk/merlin/autoscaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class MetricsType(Enum):
MEMORY_UTILIZATION: percentage of Memory utilization.
RPS: throughput in request per second.
"""

CONCURRENCY = "concurrency"
CPU_UTILIZATION = "cpu_utilization"
MEMORY_UTILIZATION = "memory_utilization"
Expand Down Expand Up @@ -43,5 +44,7 @@ def target_value(self) -> float:
return self._target_value


RAW_DEPLOYMENT_DEFAULT_AUTOSCALING_POLICY = AutoscalingPolicy(MetricsType.CPU_UTILIZATION, 50)
RAW_DEPLOYMENT_DEFAULT_AUTOSCALING_POLICY = AutoscalingPolicy(
MetricsType.CPU_UTILIZATION, 50
)
SERVERLESS_DEFAULT_AUTOSCALING_POLICY = AutoscalingPolicy(MetricsType.CONCURRENCY, 1)
Loading

0 comments on commit e16e67c

Please sign in to comment.