Skip to content

Commit

Permalink
Feast docs (flyteorg#417)
Browse files Browse the repository at this point in the history
* Feast docs

Signed-off-by: Samhita Alla <[email protected]>

* remove sqlite docs

Signed-off-by: Samhita Alla <[email protected]>
  • Loading branch information
samhita-alla authored Sep 22, 2021
1 parent 349f885 commit a8e90a2
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 582 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ghcr_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ jobs:
path: integrations/kubernetes
- name: kfpytorch
path: integrations/kubernetes
- name: sqlite_datacleaning
path: case_studies/feature_engineering
- name: sagemaker_training
path: integrations/aws
- name: sagemaker_pytorch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PREFIX=feast
PREFIX=feast_integration
include ../../../common/common.mk
include ../../../common/leaf.mk
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
Feast Integration
-----------------

**Feature Engineering** off-late has become one of the most prominent topics in Machine Learning.
It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data.

** `Feast<https://feast.dev/>`_ is an operational data system for managing and serving machine learning features to models in production.**
`Feast <https://feast.dev/>`__ is an operational data system for managing and serving machine learning features to models in production.

Flyte provides a way to train models and perform feature engineering as a single pipeline.
But, it provides no way to serve these features to production when the model matures and is ready to be served in production.
But it provides no way to serve these features to production when the model matures and is ready to be served in production.

This is where Feast comes into the picture.

Flyte adds the capability of engineering the features and Feast provides the feature registry and online serving system. One thing that Flyte makes possible is incremental development of features and only turning on the sync to online stores when you are confident about the features
By leveraging the collective capabilities, Flyte adds the ability to engineer the features, and Feast provides the feature registry and online feature serving system.
Moreover, Flyte can help us ensure incremental development of features and enables us to turn on the sync to online stores only when one is confident about the features.

In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model curated using the Flyte pipeline.
In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model through a pipeline curated using Flyte.

Dataset
=======
We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset.
We will use the horse colic dataset wherein we will determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset.

The dataset will have the following columns:

Expand Down Expand Up @@ -48,19 +48,15 @@ The dataset will have the following columns:
- surgical lesion
- timestamp

The horse colic dataset will be a compressed zip file consisting of the SQLite DB. For this example we just wanted a dataset that was available online, but this could be easily plugged into another dataset / data management system like Snowflake, Athena, Hive, BigQuery or Spark, all of those are supported by Flyte.
The horse colic dataset will be a compressed zip file consisting of the SQLite DB.
For this example, we just wanted a dataset available online, but this could be easily plugged into another dataset/data management system like Snowflake, Athena, Hive, BigQuery, or Spark, all of which are supported by Flyte.

Takeaways
=========
The example we're trying to demonstrate is a simple feature engineering job that you can seamlessly construct with Flyte. Here's what the nitty-gritties are:
The example we are trying to demonstrate is a simple feature engineering job. Here are the nitty-gritties:

#. Source data is from SQL-like data sources
#. Procreated feature transforms
#. Ability to create a low-code platform
#. Feast integration
#. Serve features to production using Feast
#. TaskTemplate within an imperative workflow

.. tip::

If you're a data scientist, you needn't worry about the infrastructure overhead. Flyte provides an easy-to-use interface which looks just like a typical library.
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""
Feast Custom Provider
---------------------
Custom provider helps in handling remote Feast manifests within Flyte. It helps Flyte tasks communicate seamlessly.
"""

from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Callable, List, Union

import pandas
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.local import LocalProvider
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from flytekit.core.context_manager import FlyteContext
Expand Down Expand Up @@ -72,6 +75,7 @@ def _localize_feature_view(self, feature_view: FeatureView):
"""
if not isinstance(feature_view.batch_source, FileSource):
return

# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from flytekit.configuration import aws
from datetime import datetime
import pandas as pd
"""
Feature Store Dataclass
-----------------------
This dataclass provides a unified interface to access Feast methods from within a feature store.
"""

import os
from typing import Type
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from dataclasses_json import dataclass_json
from feast import repo_config
from feast.feature_store import FeatureStore
from feast.repo_config import RepoConfig
from flytekit import FlyteContext
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models.literals import Literal, Scalar
from flytekit.models.types import LiteralType, SimpleType
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast import FeatureStore as FeastFeatureStore
from google.protobuf.struct_pb2 import Struct
from google.protobuf.json_format import MessageToDict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.repo_config import RepoConfig
from flytekit import FlyteContext
from flytekit.configuration import aws


@dataclass_json
Expand All @@ -31,7 +31,6 @@ class FeatureStoreConfig:
s3_bucket: str
online_store_path: str = 'online.db'


@dataclass_json
@dataclass
class FeatureStore:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
import os
from datetime import datetime, timedelta

from flytekit.core.context_manager import FlyteContext
"""
Flyte Pipeline with Feast
-------------------------
This workflow makes use of the feature engineering tasks defined in the other file. We'll build an end-to-end Flyte pipeline utilizing "Feast".
Here is the step-by-step process:
* Fetch the SQLite3 data as a Pandas DataFrame
* Perform mean-median-imputation
* Build a feature store
* Store the updated features in an offline store
* Retrieve the features from an offline store
* Perform univariate-feature-selection
* Train a Naive Bayes model
* Load features into an online store
* Fetch one feature vector for inference
* Generate prediction
"""

import random
import joblib
import logging
import random
import typing

# %%
# Let's import the libraries.
from datetime import datetime, timedelta

import joblib
import pandas as pd
from feast import (
Entity,
Feature,
FeatureStore,
FeatureView,
FileSource,
RepoConfig,
ValueType,
online_response,
registry,
)
from feast import Entity, Feature, FeatureStore, FeatureView, FileSource, ValueType
from feast_dataobjects import FeatureStore, FeatureStoreConfig
from feature_eng_tasks import mean_median_imputer, univariate_selection
from flytekit import task, workflow
from flytekit.core.node_creation import create_node
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from flytekit import reference_task, task, workflow, Workflow
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit.types.file import JoblibSerializedFile
from flytekit.types.file.file import FlyteFile
from flytekit.types.schema import FlyteSchema
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from flytekit.configuration import aws
from feature_eng_tasks import mean_median_imputer, univariate_selection
from feast_dataobjects import FeatureStore, FeatureStoreConfig


logger = logging.getLogger(__file__)


# %%
# We define the necessary data holders.

# TODO: find a better way to define these features.
FEAST_FEATURES = [
"horse_colic_stats:rectal temperature",
Expand All @@ -61,7 +70,24 @@
),
)


# %%
# We define two tasks, namely ``store_offline`` and ``load_historical_features`` to store and retrieve the historial features.
#
# .. list-table:: Decoding the ``Feast`` Jargon
# :widths: 25 25
#
# * - ``FeatureStore``
# - A FeatureStore object is used to define, create, and retrieve features.
# * - ``Entity``
# - Represents a collection of entities and associated metadata. It's usually the primary key of your data.
# * - ``FeatureView``
# - A FeatureView defines a logical grouping of serveable features.
# * - ``FileSource``
# - File data sources allow for the retrieval of historical feature values from files on disk for building training datasets, as well as for materializing features into an online store.
# * - ``apply()``
# - Register objects to metadata store and update related infrastructure.
# * - ``get_historical_features()``
# - Enrich an entity dataframe with historical feature values for either training or batch scoring.
@task
def store_offline(feature_store: FeatureStore, dataframe: FlyteSchema):
horse_colic_entity = Entity(name="Hospital Number", value_type=ValueType.STRING)
Expand Down Expand Up @@ -127,7 +153,7 @@ def load_historical_features(feature_store: FeatureStore) -> FlyteSchema:


# %%
# Next, we train the Naive Bayes model using the data that's been fetched from the feature store.
# Next, we train a naive bayes model using the data from the feature store.
@task
def train_model(dataset: pd.DataFrame, data_class: str) -> JoblibSerializedFile:
X_train, _, y_train, _ = train_test_split(
Expand All @@ -143,6 +169,22 @@ def train_model(dataset: pd.DataFrame, data_class: str) -> JoblibSerializedFile:
joblib.dump(model, fname)
return fname

# %%
# To perform inferencing, we define two tasks: ``store_online`` and ``retrieve_online``.
#
# .. list-table:: Decoding the ``Feast`` Jargon
# :widths: 25 25
#
# * - ``materialize()``
# - Materialize data from the offline store into the online store.
# * - ``get_online_features()``
# - Retrieves the latest online feature data.
#
# .. note::
# One key difference between an online and offline store is that only the latest feature values are stored per entity key in an
# online store, unlike an offline store where all feature values are stored.
# Our dataset has two such entries with the same ``Hospital Number`` but different time stamps.
# Only data point with the latest timestamp will be stored in the online store.
@task
def store_online(feature_store: FeatureStore):
feature_store.materialize(
Expand All @@ -162,7 +204,7 @@ def retrieve_online(


# %%
# We define a task to test the model using the inference point fetched earlier.
# We define a task to test our model using the inference point fetched earlier.
@task
def test_model(
model_ser: JoblibSerializedFile,
Expand All @@ -179,7 +221,8 @@ def test_model(
prediction = model.predict([test_list])
return prediction


# %%
# Next, we need to convert timestamp column in the underlying dataframe, otherwise its type is written as string.
@task
def convert_timestamp_column(
dataframe: FlyteSchema, timestamp_column: str
Expand All @@ -188,12 +231,15 @@ def convert_timestamp_column(
df[timestamp_column] = pd.to_datetime(df[timestamp_column])
return df

# %%
# The ``build_feature_store`` task is a medium to access Feast methods by building a feature store.
@task
def build_feature_store(s3_bucket: str, registry_path: str, online_store_path: str) -> FeatureStore:
feature_store_config = FeatureStoreConfig(project="horsecolic", s3_bucket=s3_bucket, registry_path=registry_path, online_store_path=online_store_path)
return FeatureStore(config=feature_store_config)


# %%
# Finally, we define a workflow that streamlines the whole pipeline building and feature serving process.
@workflow
def feast_workflow(
imputation_method: str = "mean",
Expand All @@ -202,15 +248,19 @@ def feast_workflow(
registry_path: str = "registry.db",
online_store_path: str = "online.db",
) -> typing.List[str]:

# Load parquet file from sqlite task
df = sql_task()

# Perfrom mean median imputation
dataframe = mean_median_imputer(dataframe=df, imputation_method=imputation_method)
# Need to convert timestamp column in the underlying dataframe, otherwise its type is written as
# string. There is probably a better way of doing this conversion.

# Convert timestamp column from string to datetime.
converted_df = convert_timestamp_column(
dataframe=dataframe, timestamp_column="timestamp"
)

# Build feature store
feature_store = build_feature_store(s3_bucket=s3_bucket, registry_path=registry_path, online_store_path=online_store_path)

# Ingest data into offline store
Expand All @@ -230,23 +280,29 @@ def feast_workflow(
load_historical_features_node >> store_online_node
store_online_node >> retrieve_online_node

# Use a feature retrieved from the online store for inference on a trained model
# Perform univariate feature selection
selected_features = univariate_selection(
dataframe=load_historical_features_node.o0,
num_features=num_features_univariate,
data_class=DATA_CLASS,
)

# Train the Naive Bayes model
trained_model = train_model(
dataset=selected_features,
data_class=DATA_CLASS,
)

# Use a feature retrieved from the online store for inference
prediction = test_model(
model_ser=trained_model,
inference_point=retrieve_online_node.o0,
)

return prediction


if __name__ == "__main__":
print(f"{feast_workflow()}")

# %%
# You should see prediction against the test input as the workflow output.
Loading

0 comments on commit a8e90a2

Please sign in to comment.