From a8e90a25295633eee2d960bde46cecd8cabcd94e Mon Sep 17 00:00:00 2001 From: Samhita Alla Date: Wed, 22 Sep 2021 14:13:05 +0530 Subject: [PATCH] Feast docs (#417) * Feast docs Signed-off-by: Samhita Alla * remove sqlite docs Signed-off-by: Samhita Alla --- .github/workflows/ghcr_push.yml | 2 - .../feast_integration/Makefile | 2 +- .../feast_integration/README.rst | 26 ++-- .../custom_provider/provider.py | 14 +- .../feast_integration/feast_dataobjects.py | 35 +++-- .../feast_integration/feast_workflow.py | 124 +++++++++++----- .../feast_integration/feature_eng_tasks.py | 12 +- .../sqlite_datacleaning/Dockerfile | 51 ------- .../sqlite_datacleaning/Makefile | 3 - .../sqlite_datacleaning/README.rst | 90 ------------ .../sqlite_datacleaning/__init__.py | 0 .../sqlite_datacleaning/datacleaning_tasks.py | 58 -------- .../datacleaning_workflow.py | 133 ----------------- .../sqlite_datacleaning/requirements.in | 3 - .../sqlite_datacleaning/requirements.txt | 136 ------------------ .../sqlite_datacleaning/sandbox.config | 7 - cookbook/docs/conf.py | 13 +- cookbook/docs/feature_engineering.rst | 17 ++- cookbook/docs/tutorials.rst | 5 - cookbook/flyte_tests_manifest.json | 4 +- 20 files changed, 153 insertions(+), 582 deletions(-) delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/__init__.py delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt delete mode 100644 cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config diff --git a/.github/workflows/ghcr_push.yml b/.github/workflows/ghcr_push.yml index d5b83e6c0d..3bd43f84cb 100644 --- a/.github/workflows/ghcr_push.yml +++ b/.github/workflows/ghcr_push.yml @@ -27,8 +27,6 @@ jobs: path: integrations/kubernetes - name: kfpytorch path: integrations/kubernetes - - name: sqlite_datacleaning - path: case_studies/feature_engineering - name: sagemaker_training path: integrations/aws - name: sagemaker_pytorch diff --git a/cookbook/case_studies/feature_engineering/feast_integration/Makefile b/cookbook/case_studies/feature_engineering/feast_integration/Makefile index 1e9c73693f..5c7acf1126 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/Makefile +++ b/cookbook/case_studies/feature_engineering/feast_integration/Makefile @@ -1,3 +1,3 @@ -PREFIX=feast +PREFIX=feast_integration include ../../../common/common.mk include ../../../common/leaf.mk diff --git a/cookbook/case_studies/feature_engineering/feast_integration/README.rst b/cookbook/case_studies/feature_engineering/feast_integration/README.rst index a65a09960c..eccc20c423 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/README.rst +++ b/cookbook/case_studies/feature_engineering/feast_integration/README.rst @@ -1,21 +1,21 @@ Feast Integration ----------------- -**Feature Engineering** off-late has become one of the most prominent topics in Machine Learning. -It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data. - -** `Feast`_ is an operational data system for managing and serving machine learning features to models in production.** +`Feast `__ is an operational data system for managing and serving machine learning features to models in production. Flyte provides a way to train models and perform feature engineering as a single pipeline. -But, it provides no way to serve these features to production when the model matures and is ready to be served in production. +But it provides no way to serve these features to production when the model matures and is ready to be served in production. + +This is where Feast comes into the picture. -Flyte adds the capability of engineering the features and Feast provides the feature registry and online serving system. One thing that Flyte makes possible is incremental development of features and only turning on the sync to online stores when you are confident about the features +By leveraging the collective capabilities, Flyte adds the ability to engineer the features, and Feast provides the feature registry and online feature serving system. +Moreover, Flyte can help us ensure incremental development of features and enables us to turn on the sync to online stores only when one is confident about the features. -In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model curated using the Flyte pipeline. +In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model through a pipeline curated using Flyte. Dataset ======= -We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset. +We will use the horse colic dataset wherein we will determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset. The dataset will have the following columns: @@ -48,19 +48,15 @@ The dataset will have the following columns: - surgical lesion - timestamp -The horse colic dataset will be a compressed zip file consisting of the SQLite DB. For this example we just wanted a dataset that was available online, but this could be easily plugged into another dataset / data management system like Snowflake, Athena, Hive, BigQuery or Spark, all of those are supported by Flyte. +The horse colic dataset will be a compressed zip file consisting of the SQLite DB. +For this example, we just wanted a dataset available online, but this could be easily plugged into another dataset/data management system like Snowflake, Athena, Hive, BigQuery, or Spark, all of which are supported by Flyte. Takeaways ========= -The example we're trying to demonstrate is a simple feature engineering job that you can seamlessly construct with Flyte. Here's what the nitty-gritties are: +The example we are trying to demonstrate is a simple feature engineering job. Here are the nitty-gritties: #. Source data is from SQL-like data sources #. Procreated feature transforms #. Ability to create a low-code platform #. Feast integration #. Serve features to production using Feast -#. TaskTemplate within an imperative workflow - -.. tip:: - - If you're a data scientist, you needn't worry about the infrastructure overhead. Flyte provides an easy-to-use interface which looks just like a typical library. diff --git a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py index b124edab46..9ab6252eff 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py @@ -1,15 +1,18 @@ +""" +Feast Custom Provider +--------------------- + +Custom provider helps in handling remote Feast manifests within Flyte. It helps Flyte tasks communicate seamlessly. +""" + from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Callable, List, Union import pandas -from feast.entity import Entity -from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.infra.local import LocalProvider from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry from feast.repo_config import RepoConfig from flytekit.core.context_manager import FlyteContext @@ -72,6 +75,7 @@ def _localize_feature_view(self, feature_view: FeatureView): """ if not isinstance(feature_view.batch_source, FileSource): return + # Copy parquet file to a local file file_source: FileSource = feature_view.batch_source random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path) diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py index 891c95aac0..ddc630dcec 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py @@ -1,26 +1,26 @@ -from flytekit.configuration import aws -from datetime import datetime -import pandas as pd +""" +Feature Store Dataclass +----------------------- + +This dataclass provides a unified interface to access Feast methods from within a feature store. +""" + import os -from typing import Type from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +import pandas as pd from dataclasses_json import dataclass_json -from feast import repo_config -from feast.feature_store import FeatureStore -from feast.repo_config import RepoConfig -from flytekit import FlyteContext -from flytekit.core.type_engine import TypeEngine, TypeTransformer -from flytekit.models.literals import Literal, Scalar -from flytekit.models.types import LiteralType, SimpleType -from feast.infra.offline_stores.file import FileOfflineStoreConfig -from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast import FeatureStore as FeastFeatureStore -from google.protobuf.struct_pb2 import Struct -from google.protobuf.json_format import MessageToDict -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from feast.entity import Entity -from feast.feature_view import FeatureView from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.infra.offline_stores.file import FileOfflineStoreConfig +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig +from flytekit import FlyteContext +from flytekit.configuration import aws @dataclass_json @@ -31,7 +31,6 @@ class FeatureStoreConfig: s3_bucket: str online_store_path: str = 'online.db' - @dataclass_json @dataclass class FeatureStore: diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py index 6bdb082135..de1fc7a8ed 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py @@ -1,40 +1,49 @@ -import os -from datetime import datetime, timedelta - -from flytekit.core.context_manager import FlyteContext +""" +Flyte Pipeline with Feast +------------------------- + +This workflow makes use of the feature engineering tasks defined in the other file. We'll build an end-to-end Flyte pipeline utilizing "Feast". +Here is the step-by-step process: + +* Fetch the SQLite3 data as a Pandas DataFrame +* Perform mean-median-imputation +* Build a feature store +* Store the updated features in an offline store +* Retrieve the features from an offline store +* Perform univariate-feature-selection +* Train a Naive Bayes model +* Load features into an online store +* Fetch one feature vector for inference +* Generate prediction +""" -import random -import joblib import logging +import random import typing + +# %% +# Let's import the libraries. +from datetime import datetime, timedelta + +import joblib import pandas as pd -from feast import ( - Entity, - Feature, - FeatureStore, - FeatureView, - FileSource, - RepoConfig, - ValueType, - online_response, - registry, -) +from feast import Entity, Feature, FeatureStore, FeatureView, FileSource, ValueType +from feast_dataobjects import FeatureStore, FeatureStoreConfig +from feature_eng_tasks import mean_median_imputer, univariate_selection +from flytekit import task, workflow from flytekit.core.node_creation import create_node -from feast.infra.offline_stores.file import FileOfflineStoreConfig -from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig -from flytekit import reference_task, task, workflow, Workflow from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.types.file import JoblibSerializedFile -from flytekit.types.file.file import FlyteFile from flytekit.types.schema import FlyteSchema from sklearn.model_selection import train_test_split from sklearn.naive_bayes import GaussianNB -from flytekit.configuration import aws -from feature_eng_tasks import mean_median_imputer, univariate_selection -from feast_dataobjects import FeatureStore, FeatureStoreConfig - logger = logging.getLogger(__file__) + + +# %% +# We define the necessary data holders. + # TODO: find a better way to define these features. FEAST_FEATURES = [ "horse_colic_stats:rectal temperature", @@ -61,7 +70,24 @@ ), ) - +# %% +# We define two tasks, namely ``store_offline`` and ``load_historical_features`` to store and retrieve the historial features. +# +# .. list-table:: Decoding the ``Feast`` Jargon +# :widths: 25 25 +# +# * - ``FeatureStore`` +# - A FeatureStore object is used to define, create, and retrieve features. +# * - ``Entity`` +# - Represents a collection of entities and associated metadata. It's usually the primary key of your data. +# * - ``FeatureView`` +# - A FeatureView defines a logical grouping of serveable features. +# * - ``FileSource`` +# - File data sources allow for the retrieval of historical feature values from files on disk for building training datasets, as well as for materializing features into an online store. +# * - ``apply()`` +# - Register objects to metadata store and update related infrastructure. +# * - ``get_historical_features()`` +# - Enrich an entity dataframe with historical feature values for either training or batch scoring. @task def store_offline(feature_store: FeatureStore, dataframe: FlyteSchema): horse_colic_entity = Entity(name="Hospital Number", value_type=ValueType.STRING) @@ -127,7 +153,7 @@ def load_historical_features(feature_store: FeatureStore) -> FlyteSchema: # %% -# Next, we train the Naive Bayes model using the data that's been fetched from the feature store. +# Next, we train a naive bayes model using the data from the feature store. @task def train_model(dataset: pd.DataFrame, data_class: str) -> JoblibSerializedFile: X_train, _, y_train, _ = train_test_split( @@ -143,6 +169,22 @@ def train_model(dataset: pd.DataFrame, data_class: str) -> JoblibSerializedFile: joblib.dump(model, fname) return fname +# %% +# To perform inferencing, we define two tasks: ``store_online`` and ``retrieve_online``. +# +# .. list-table:: Decoding the ``Feast`` Jargon +# :widths: 25 25 +# +# * - ``materialize()`` +# - Materialize data from the offline store into the online store. +# * - ``get_online_features()`` +# - Retrieves the latest online feature data. +# +# .. note:: +# One key difference between an online and offline store is that only the latest feature values are stored per entity key in an +# online store, unlike an offline store where all feature values are stored. +# Our dataset has two such entries with the same ``Hospital Number`` but different time stamps. +# Only data point with the latest timestamp will be stored in the online store. @task def store_online(feature_store: FeatureStore): feature_store.materialize( @@ -162,7 +204,7 @@ def retrieve_online( # %% -# We define a task to test the model using the inference point fetched earlier. +# We define a task to test our model using the inference point fetched earlier. @task def test_model( model_ser: JoblibSerializedFile, @@ -179,7 +221,8 @@ def test_model( prediction = model.predict([test_list]) return prediction - +# %% +# Next, we need to convert timestamp column in the underlying dataframe, otherwise its type is written as string. @task def convert_timestamp_column( dataframe: FlyteSchema, timestamp_column: str @@ -188,12 +231,15 @@ def convert_timestamp_column( df[timestamp_column] = pd.to_datetime(df[timestamp_column]) return df +# %% +# The ``build_feature_store`` task is a medium to access Feast methods by building a feature store. @task def build_feature_store(s3_bucket: str, registry_path: str, online_store_path: str) -> FeatureStore: feature_store_config = FeatureStoreConfig(project="horsecolic", s3_bucket=s3_bucket, registry_path=registry_path, online_store_path=online_store_path) return FeatureStore(config=feature_store_config) - +# %% +# Finally, we define a workflow that streamlines the whole pipeline building and feature serving process. @workflow def feast_workflow( imputation_method: str = "mean", @@ -202,15 +248,19 @@ def feast_workflow( registry_path: str = "registry.db", online_store_path: str = "online.db", ) -> typing.List[str]: + # Load parquet file from sqlite task df = sql_task() + + # Perfrom mean median imputation dataframe = mean_median_imputer(dataframe=df, imputation_method=imputation_method) - # Need to convert timestamp column in the underlying dataframe, otherwise its type is written as - # string. There is probably a better way of doing this conversion. + + # Convert timestamp column from string to datetime. converted_df = convert_timestamp_column( dataframe=dataframe, timestamp_column="timestamp" ) + # Build feature store feature_store = build_feature_store(s3_bucket=s3_bucket, registry_path=registry_path, online_store_path=online_store_path) # Ingest data into offline store @@ -230,16 +280,20 @@ def feast_workflow( load_historical_features_node >> store_online_node store_online_node >> retrieve_online_node - # Use a feature retrieved from the online store for inference on a trained model + # Perform univariate feature selection selected_features = univariate_selection( dataframe=load_historical_features_node.o0, num_features=num_features_univariate, data_class=DATA_CLASS, ) + + # Train the Naive Bayes model trained_model = train_model( dataset=selected_features, data_class=DATA_CLASS, ) + + # Use a feature retrieved from the online store for inference prediction = test_model( model_ser=trained_model, inference_point=retrieve_online_node.o0, @@ -247,6 +301,8 @@ def feast_workflow( return prediction - if __name__ == "__main__": print(f"{feast_workflow()}") + +# %% +# You should see prediction against the test input as the workflow output. \ No newline at end of file diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py b/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py index d6b0cc66bf..032f292e4e 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py @@ -1,7 +1,7 @@ """ Feature Engineering Tasks ------------------------- -We'll define the relevant feature engineering tasks to clean up the SQLite3 data. +We'll define the relevant feature engineering tasks to clean up the SQLite data. """ # %% @@ -27,7 +27,8 @@ # %% -# Next, we define a ``mean_median_imputer`` task to fill in the missing values of the dataset, for which we use `SimpleImputer `__ class from the ``scikit-learn`` library. +# We define a ``mean_median_imputer`` task to fill in the missing values of the dataset, for which we use the +# `SimpleImputer `__ class from the ``scikit-learn`` library. @task def mean_median_imputer( dataframe: pd.DataFrame, @@ -52,7 +53,8 @@ def mean_median_imputer( # %% # Let's define the other task called ``univariate_selection`` that does feature selection. -# The `SelectKBest `__ method removes all but the highest scoring features (data frame columns). +# The `SelectKBest `__ method removes all +# but the highest scoring features (DataFrame columns). @task def univariate_selection( dataframe: pd.DataFrame, num_features: int, data_class: str @@ -74,7 +76,3 @@ def univariate_selection( column_names.extend([data_class]) features = fit.transform(X) return pd.DataFrame(np.c_[features, y.to_numpy()], columns=column_names) - - -# %% -# The aforementioned feature engineering tasks are used as ``reference tasks`` while building the Flyte pipeline with Feast. diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile deleted file mode 100644 index 0491d9ddc0..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile +++ /dev/null @@ -1,51 +0,0 @@ -FROM ubuntu:focal - -WORKDIR /root -ENV VENV /opt/venv -ENV LANG C.UTF-8 -ENV LC_ALL C.UTF-8 -ENV PYTHONPATH /root - -RUN : \ - && apt-get update \ - && apt install -y software-properties-common \ - && add-apt-repository ppa:deadsnakes/ppa - -RUN : \ - && apt-get update \ - && apt-get install -y python3.8 python3-pip python3-venv make build-essential libssl-dev curl vim - -# This is necessary for opencv to work -RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg - -# Install the AWS cli separately to prevent issues with boto being written over -RUN pip3 install awscli - -# Virtual environment -RUN python3.8 -m venv ${VENV} -RUN ${VENV}/bin/pip install wheel - -# Install Python dependencies -COPY sqlite_datacleaning/requirements.txt /root -RUN ${VENV}/bin/pip install -r /root/requirements.txt - -# Copy the makefile targets to expose on the container. This makes it easier to register. -COPY in_container.mk /root/Makefile -COPY sqlite_datacleaning/sandbox.config /root - -# Copy the actual code -COPY sqlite_datacleaning/ /root/sqlite_datacleaning/ - -# Copy over the helper script that the SDK relies on -RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/ -RUN chmod a+x /usr/local/bin/flytekit_venv - -RUN pip install -U https://github.com/flyteorg/flytekit/archive/62391eaff894188bb723f382af3de29a977233ce.zip#egg=flytekit - -# This tag is supplied by the build script and will be used to determine the version -# when registering tasks, workflows, and launch plans -ARG tag -ENV FLYTE_INTERNAL_IMAGE $tag - -# Enable the virtualenv for this image. Note this relies on the VENV variable we've set in this image. -ENTRYPOINT ["/usr/local/bin/flytekit_venv"] diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile deleted file mode 100644 index 4856caa448..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile +++ /dev/null @@ -1,3 +0,0 @@ -PREFIX=sqlite_datacleaning -include ../../../common/common.mk -include ../../../common/leaf.mk diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst deleted file mode 100644 index ffe75ef513..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst +++ /dev/null @@ -1,90 +0,0 @@ -Data Cleaning -------------- -Feature Engineering off-late has become one of the most prominent topics in Machine Learning. -It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data. - -This tutorial will implement data cleaning of SQLite3 data, which does both data imputation and univariate feature selection. These are so-called feature engineering techniques. - -Why SQLite3? -============ -SQLite3 is written such that the task doesn't depend on the user's image. It basically: - -- Shifts the burden of writing the Dockerfile from the user using the task in workflows, to the author of the task type -- Allows the author to optimize the image that the task runs -- Works locally and remotely - -.. note:: - - SQLite3 container is special; the definition of the Python classes themselves is bundled in Flytekit, hence we just use the Flytekit image. - -.. tip:: - - SQLite3 is being used to showcase the example of using a ``TaskTemplate``. This is the same for SQLAlchemy. As for Athena, BigQuery, Hive plugins, a container is not required. The queries are registered with FlyteAdmin and sent directly to the respective engines. - -Where does Flyte fit in? -======================== -Flyte provides a way to train models and perform feature engineering as a single pipeline. - -.. admonition:: What's so special about this example? - - The pipeline doesn't build a container as such; it re-uses the pre-built task containers to construct the workflow! - -Dataset -======= -We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse was surgical or not. This is a modified version of the original dataset. - -The dataset will have the following columns: - -.. list-table:: Horse Colic Features - :widths: 25 25 25 - - * - surgery - - Age - - Hospital Number - * - rectal temperature - - pulse - - respiratory rate - * - temperature of extremities - - peripheral pulse - - mucous membranes - * - capillary refill time - - pain - - peristalsis - * - abdominal distension - - nasogastric tube - - nasogastric reflux - * - nasogastric reflux PH - - rectal examination - - abdomen - * - packed cell volume - - total protein - - abdominocentesis appearance - * - abdomcentesis total protein - - outcome - - surgical lesion - -The horse colic dataset will be a compressed zip file consisting of the SQLite DB. - -Steps to Build the Pipeline -=========================== -- Define two feature engineering tasks -- "data imputation" and "univariate feature selection" -- Reference the tasks in the actual file -- Define an SQLite3 Task and generate FlyteSchema -- Pass the inputs through an imperative workflow to validate the dataset -- Return the resultant DataFrame - -Takeaways -========= -The example we're trying to demonstrate is a simple feature engineering job that you can seamlessly construct with Flyte. Here's what the nitty-gritties are: - -#. Source data is from SQL-like data sources -#. Procreated feature transforms -#. Ability to create a low-code platform -#. TaskTemplate within an imperative workflow - -.. tip:: - - If you're a data scientist, you needn't worry about the infrastructure overhead. Flyte provides an easy-to-use interface which looks just like a typical library. - -Code Walkthrough -================ diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/__init__.py b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py deleted file mode 100644 index 9cc28be692..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Data Cleaning Tasks -------------------------- - -We'll define the relevant feature engineering tasks to clean up the SQLite3 data. -""" - -# %% -# Firstly, let's import the required libraries. -import numpy as np -import pandas as pd -from flytekit import task -from numpy.core.fromnumeric import sort -from sklearn.feature_selection import SelectKBest, f_classif -from sklearn.impute import SimpleImputer - -# %% -# Next, we define a ``mean_median_imputer`` task to fill in the missing values of the dataset, for which we use `SimpleImputer `__ class picked from the ``scikit-learn`` library. -@task -def mean_median_imputer( - dataframe: pd.DataFrame, - imputation_method: str, -) -> pd.DataFrame: - - dataframe = dataframe.replace("?", np.nan) - if imputation_method not in ["median", "mean"]: - raise ValueError("imputation_method takes only values 'median' or 'mean'") - - imputer = SimpleImputer(missing_values=np.nan, strategy=imputation_method) - - imputer = imputer.fit(dataframe) - dataframe[:] = imputer.transform(dataframe) - - return dataframe - -# %% -# This task returns the filled-in dataframe. - -# %% -# Let's define one other task called ``univariate_selection`` that does feature selection. -# The `SelectKBest `__ method removes all but the highest scoring features. - -@task -def univariate_selection( - dataframe: pd.DataFrame, split_mask: int, num_features: int -) -> pd.DataFrame: - - X = dataframe.iloc[:, 0:split_mask] - y = dataframe.iloc[:, split_mask] - test = SelectKBest(score_func=f_classif, k=num_features) - fit = test.fit(X, y) - indices = sort((-fit.scores_).argsort()[:num_features]) - column_names = map(dataframe.columns.__getitem__, indices) - features = fit.transform(X) - return pd.DataFrame(features, columns=column_names) - -# %% -# This task returns a dataframe with the specified number of columns. diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py deleted file mode 100644 index 8d5625ebf2..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py +++ /dev/null @@ -1,133 +0,0 @@ -""" -Data Cleaning Imperative Workflow ---------------------------------------- - -This workflow makes use of the feature engineering tasks defined in the other file. -We'll build an SQLite3 data cleaning pipeline utilizing these tasks. - -.. tip:: - - You can simply import the tasks, but we use references because we are referring to the existing code. -""" - -# %% -# Let's import the libraries. -import pandas as pd -from flytekit import CronSchedule, LaunchPlan, Workflow, kwtypes, reference_task -from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task -from flytekit.types.schema import FlyteSchema - - -# %% -# Next, we define the reference tasks. A :py:func:`flytekit.reference_task` references the Flyte tasks that have already been defined, serialized, and registered. -# The primary advantage of using a reference task is to reduce the redundancy; we needn't define the task(s) again if we have multiple datasets that need to be feature-engineered. -# -# .. note:: -# -# The Macro ``{{ registration.version }}`` is filled during the registration time by `flytectl register`. This is usually not required for using reference tasks, you should -# ideally bind to a specific version of the entity - task / launchplan. But, in the case of this example, we are registering both the actual task ``sqlite_datacleaning.tasks.mean_median_imputer`` and -# and the workflow that references it. Thus we want it to actually be updated to the version of a specific release of FlyteSnacks. This is why we use the ``{{ registration.version }}`` macro. -# A typical example of reference task would look more like -# -# .. code-block:: python -# -# @reference_task( -# project="flytesnacks", -# domain="development", -# name="sqlite_datacleaning.tasks.mean_median_imputer", -# version="d06cebcfbeabc02b545eefa13a01c6ca992940c8", # If using GIT for versioning OR 0.16.0 is using semver -# ) -# def mean_median_imputer() -# ... -# -@reference_task( - project="flytesnacks", - domain="development", - name="sqlite_datacleaning.datacleaning_tasks.mean_median_imputer", - version="{{ registration.version }}", -) -def mean_median_imputer( - dataframe: pd.DataFrame, - imputation_method: str, -) -> pd.DataFrame: - ... - - -@reference_task( - project="flytesnacks", - domain="development", - name="sqlite_datacleaning.datacleaning_tasks.univariate_selection", - version="{{ registration.version }}", -) -def univariate_selection( - dataframe: pd.DataFrame, - split_mask: int, - num_features: int, -) -> pd.DataFrame: - ... - - -# %% -# .. note:: -# -# The ``version`` varies depending on the version assigned during the task registration process. - -# %% -# Finally, we define an imperative workflow that accepts the two reference tasks we've prototyped above. The data flow can be interpreted as follows: -# -# #. An SQLite3 task is defined to fetch the data batch -# #. The output (FlyteSchema) is passed to the ``mean_median_imputer`` task -# #. The output produced by ``mean_median_imputer`` is given to the ``univariate_selection`` task -# #. The dataframe generated by ``univariate_selection`` is the workflow output -wb = Workflow(name="sqlite_datacleaning.workflow.fe_wf") -wb.add_workflow_input("imputation_method", str) -wb.add_workflow_input("limit", int) -wf_in = wb.add_workflow_input("num_features", int) - -sql_task = SQLite3Task( - name="sqlite3.horse_colic", - query_template="select * from data limit {{ .inputs.limit }}", - inputs=kwtypes(limit=int), - output_schema_type=FlyteSchema, - task_config=SQLite3Config( - uri="https://cdn.discordapp.com/attachments/545481172399030272/852144760273502248/horse_colic.db.zip", - compressed=True, - ), -) - -node_t1 = wb.add_entity( - sql_task, - limit=wb.inputs["limit"], -) -node_t2 = wb.add_entity( - mean_median_imputer, - dataframe=node_t1.outputs["results"], - imputation_method=wb.inputs["imputation_method"], -) -node_t3 = wb.add_entity( - univariate_selection, - dataframe=node_t2.outputs["o0"], - split_mask=23, - num_features=wf_in, -) -wb.add_workflow_output( - "output_from_t3", node_t3.outputs["o0"], python_type=pd.DataFrame -) - -DEFAULT_INPUTS = {"limit": 100, "imputation_method": "mean", "num_features": 15} - -sqlite_dataclean_lp = LaunchPlan.get_or_create( - workflow=wb, - name="sqlite_datacleaning", - default_inputs=DEFAULT_INPUTS, - schedule=CronSchedule("0 10 * * ? *"), -) - -if __name__ == "__main__": - print( - wb( - limit=100, - imputation_method="mean", - num_features=15, - ) - ) diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in deleted file mode 100644 index 444439d7d0..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in +++ /dev/null @@ -1,3 +0,0 @@ -flytekit>=0.19.1 -scikit-learn -numpy diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt deleted file mode 100644 index 76e6de819b..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt +++ /dev/null @@ -1,136 +0,0 @@ -# -# This file is autogenerated by pip-compile with python 3.8 -# To update, run: -# -# /Applications/Xcode.app/Contents/Developer/usr/bin/make requirements.txt -# -attrs==21.2.0 - # via scantree -certifi==2021.5.30 - # via requests -charset-normalizer==2.0.5 - # via requests -click==7.1.2 - # via flytekit -croniter==1.0.15 - # via flytekit -dataclasses-json==0.5.6 - # via flytekit -decorator==5.1.0 - # via retry -deprecated==1.2.13 - # via flytekit -dirhash==0.2.1 - # via flytekit -diskcache==5.2.1 - # via flytekit -docker-image-py==0.1.12 - # via flytekit -docstring-parser==0.10 - # via flytekit -flyteidl==0.20.2 - # via flytekit -flytekit==0.22.2 - # via -r requirements.in -grpcio==1.40.0 - # via flytekit -idna==3.2 - # via requests -importlib-metadata==4.8.1 - # via keyring -joblib==1.0.1 - # via scikit-learn -keyring==23.2.1 - # via flytekit -marshmallow==3.13.0 - # via - # dataclasses-json - # marshmallow-enum - # marshmallow-jsonschema -marshmallow-enum==1.5.1 - # via dataclasses-json -marshmallow-jsonschema==0.12.0 - # via flytekit -mypy-extensions==0.4.3 - # via typing-inspect -natsort==7.1.1 - # via flytekit -numpy==1.21.2 - # via - # -r requirements.in - # pandas - # pyarrow - # scikit-learn - # scipy -pandas==1.3.3 - # via flytekit -pathspec==0.9.0 - # via scantree -protobuf==3.17.3 - # via - # flyteidl - # flytekit -py==1.10.0 - # via retry -pyarrow==3.0.0 - # via flytekit -python-dateutil==2.8.1 - # via - # croniter - # flytekit - # pandas -python-json-logger==2.0.2 - # via flytekit -pytimeparse==1.1.8 - # via flytekit -pytz==2018.4 - # via - # flytekit - # pandas -regex==2021.8.28 - # via docker-image-py -requests==2.26.0 - # via - # flytekit - # responses -responses==0.14.0 - # via flytekit -retry==0.9.2 - # via flytekit -scantree==0.0.1 - # via dirhash -scikit-learn==0.24.2 - # via -r requirements.in -scipy==1.7.1 - # via scikit-learn -six==1.16.0 - # via - # flytekit - # grpcio - # protobuf - # python-dateutil - # responses - # scantree -sortedcontainers==2.4.0 - # via flytekit -statsd==3.3.0 - # via flytekit -threadpoolctl==2.2.0 - # via scikit-learn -typing-extensions==3.10.0.2 - # via typing-inspect -typing-inspect==0.7.1 - # via dataclasses-json -urllib3==1.26.6 - # via - # flytekit - # requests - # responses -wheel==0.37.0 - # via flytekit -wrapt==1.12.1 - # via - # deprecated - # flytekit -zipp==3.5.0 - # via importlib-metadata diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config deleted file mode 100644 index fc6f3e717d..0000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config +++ /dev/null @@ -1,7 +0,0 @@ -[sdk] -workflow_packages=sqlite_datacleaning -python_venv=flytekit_venv - -[auth] -assumable_iam_role=arn:aws:iam::173840052742:role/flytefunctionaltestsbatchworker-production-iad -raw_output_data_prefix=s3://flyte-demo/raw_data diff --git a/cookbook/docs/conf.py b/cookbook/docs/conf.py index 756cd38dcc..6251867f44 100644 --- a/cookbook/docs/conf.py +++ b/cookbook/docs/conf.py @@ -130,12 +130,15 @@ class CustomSorter(FileNameSortKey): "diabetes.py", "house_price_predictor.py", "multiregion_house_price_predictor.py", - "datacleaning_tasks.py", - "datacleaning_workflow.py", - "single_node.py", + ## Feature Engineering + "pytorch_single_node_and_gpu.py", + "pytorch_single_node_multi_gpu.py", "notebook.py", "notebook_and_task.py", "notebook_as_tasks.py", + "feature_eng_tasks.py", + "feast_dataobjects.py", + "feast_workflow.py", ] """ Take a look at the code for the default sorter included in the sphinx_gallery to see how this works. @@ -246,8 +249,8 @@ def __call__(self, filename): "../case_studies/ml_training/pima_diabetes", "../case_studies/ml_training/house_price_prediction", "../case_studies/ml_training/mnist_classifier", - "../case_studies/feature_engineering/sqlite_datacleaning", "../case_studies/feature_engineering/eda", + "../case_studies/feature_engineering/feast_integration", "../testing", "../core/containerization", "../deployment", @@ -276,8 +279,8 @@ def __call__(self, filename): "auto/case_studies/ml_training/pima_diabetes", "auto/case_studies/ml_training/house_price_prediction", "auto/case_studies/ml_training/mnist_classifier", - "auto/case_studies/feature_engineering/sqlite_datacleaning", "auto/case_studies/feature_engineering/eda", + "auto/case_studies/feature_engineering/feast_integration", "auto/testing", "auto/core/containerization", "auto/deployment", diff --git a/cookbook/docs/feature_engineering.rst b/cookbook/docs/feature_engineering.rst index 8c93424153..70c74631af 100644 --- a/cookbook/docs/feature_engineering.rst +++ b/cookbook/docs/feature_engineering.rst @@ -4,29 +4,32 @@ Feature Engineering ################### +**Feature Engineering** off-late has become one of the most prominent topics in Machine Learning. +It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data. + .. panels:: :header: text-center - .. link-button:: auto/case_studies/feature_engineering/sqlite_datacleaning/index + .. link-button:: auto/case_studies/feature_engineering/eda/index :type: ref - :text: Data Cleaning + :text: EDA, Feature Engineering, and Modeling With Papermill :classes: btn-block stretched-link ^^^^^^^^^^^^ - Perform data imputation and univariate feature selection on SQLite3 dataset + How to use Jupyter notebook within Flyte --- - .. link-button:: auto/case_studies/feature_engineering/eda/index + .. link-button:: auto/case_studies/feature_engineering/feast_integration/index :type: ref - :text: EDA, Feature Engineering, and Modeling With Papermill + :text: Data Cleaning and Feature Serving With Feast :classes: btn-block stretched-link ^^^^^^^^^^^^ - How to use Jupyter notebook within Flyte + How to use Feast to serve data in Flyte .. toctree:: :maxdepth: -1 :caption: Contents :hidden: - auto/case_studies/feature_engineering/sqlite_datacleaning/index auto/case_studies/feature_engineering/eda/index + auto/case_studies/feature_engineering/feast_integration/index diff --git a/cookbook/docs/tutorials.rst b/cookbook/docs/tutorials.rst index 69fa0ef413..a502822000 100644 --- a/cookbook/docs/tutorials.rst +++ b/cookbook/docs/tutorials.rst @@ -24,8 +24,3 @@ feature engineering, model training, and batch predictions. :classes: btn-block stretched-link ^^^^^^^^^^^^ Engineer the data features to improve your model accuracy. - - --- - :body: d-flex center-card-content - - *More tutorials coming soon...* diff --git a/cookbook/flyte_tests_manifest.json b/cookbook/flyte_tests_manifest.json index 0aa28e3d71..17ec4ee1f9 100644 --- a/cookbook/flyte_tests_manifest.json +++ b/cookbook/flyte_tests_manifest.json @@ -111,9 +111,9 @@ "exit_message": "" } },{ - "name": "case-studies-sqlite-datacleaning", + "name": "case-studies-feast-integration", "priority": "P2", - "path": "case_studies/feature_engineering/sqlite_datacleaning", + "path": "case_studies/feature_engineering/feast_integration", "exitCondition": { "exit_success": true, "exit_message": ""