Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Add feast feature store handler for enrichment transform #30957

Merged
merged 10 commits into from
Apr 26, 2024
34 changes: 34 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.57.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* Added Feast feature store handler for enrichment transform (Python) ([#30957](https://github.com/apache/beam/issues/30964)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.56.0] - Unreleased

## Highlights
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import tempfile
from pathlib import Path
from typing import List
from typing import Optional

import apache_beam as beam
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
from apache_beam.transforms.enrichment import EnrichmentSourceHandler
from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel
from feast import FeatureStore

__all__ = [
'FeastFeatureStoreEnrichmentHandler',
]

_LOGGER = logging.getLogger(__name__)

LOCAL_FEATURE_STORE_YAML_FILENAME = 'fs_yaml_file.yaml'


def download_fs_yaml_file(gcs_fs_yaml_file: str):
"""Download the feature store config file for Feast."""
try:
fs = GCSFileSystem(pipeline_options={})
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
with fs.open(gcs_fs_yaml_file, 'r') as gcs_file:
with tempfile.NamedTemporaryFile(suffix=LOCAL_FEATURE_STORE_YAML_FILENAME,
delete=False) as local_file:
local_file.write(gcs_file.read())
return Path(local_file.name)
except Exception:
raise RuntimeError(
'error downloading the file %s locally to load the '
'Feast feature store.' % gcs_fs_yaml_file)


def _validate_feature_names(feature_names, feature_service_name):
"""Check if one of `feature_names` or `feature_service_name` is provided."""
if ((not feature_names and not feature_service_name) or
bool(feature_names and feature_service_name)):
raise ValueError(
'Please provide either a list of feature names to fetch '
'from online store or a feature service name for the '
'Feast online feature store!')
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved


def _validate_feature_store_yaml_path_exists(fs_yaml_file):
"""Check if the feature store yaml path exists."""
fs = GCSFileSystem(pipeline_options={})
if not fs.exists(fs_yaml_file):
raise ValueError(
'The feature store yaml path (%s) does not exist.' % fs_yaml_file)


class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row,
beam.Row]):
"""Enrichment handler to interact with the Feast feature store.

To specify the features to fetch from Feast online store,
please specify exactly one of `feature_names` or `feature_service_name`.

Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment`
transform. To filter the features to enrich, use the `join_fn` param in
:class:`apache_beam.transforms.enrichment.Enrichment`.
"""
def __init__(
self,
entity_id: str,
feature_store_yaml_path: str,
feature_names: Optional[List[str]] = None,
feature_service_name: Optional[str] = "",
full_feature_names: Optional[bool] = False,
*,
exception_level: ExceptionLevel = ExceptionLevel.WARN,
):
"""Initializes an instance of `FeastFeatureStoreEnrichmentHandler`.

Args:
entity_id (str): entity name for the entity associated with the features.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
The `entity_id` is used to extract the entity value from the input row.
feature_store_yaml_path (str): The path to a YAML configuration file for
the Feast feature store.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
feature_names: A list of feature names to be retrieved from the online
Feast feature store.
feature_service_name (str): The name of the feature service containing
the features to fetch from the online Feast feature store.
full_feature_names (bool): Whether to use full feature names
(including namespaces, etc.). Defaults to False.
exception_level: a `enum.Enum` value from
`apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel`
to set the level when `None` feature values are fetched from the
online Feast store. Defaults to `ExceptionLevel.WARN`.
"""
self.entity_id = entity_id
self.feature_store_yaml_path = feature_store_yaml_path
self.feature_names = feature_names
self.feature_service_name = feature_service_name
self.full_feature_names = full_feature_names
self._exception_level = exception_level
_validate_feature_store_yaml_path_exists(self.feature_store_yaml_path)
_validate_feature_names(self.feature_names, self.feature_service_name)

def __enter__(self):
"""Connect with the Feast feature store."""
local_repo_path = download_fs_yaml_file(self.feature_store_yaml_path)
try:
self.store = FeatureStore(fs_yaml_file=local_repo_path)
except Exception:
raise RuntimeError(
'Invalid feature store yaml file provided. Make sure '
'the %s contains the valid configuration for Feast feature store.' %
self.feature_store_yaml_path)
if self.feature_service_name:
try:
self.features = self.store.get_feature_service(
self.feature_service_name)
except Exception:
raise RuntimeError(
'Could not find the feature service %s for the feature '
'store configured in %s.' %
(self.feature_service_name, self.feature_store_yaml_path))
else:
self.features = self.feature_names

def __call__(self, request: beam.Row, *args, **kwargs):
"""Fetches feature values for an entity-id from the Feast feature store.

Args:
request: the input `beam.Row` to enrich.
"""
request_dict = request._asdict()
feature_values = self.store.get_online_features(
features=self.features,
entity_rows=[{
self.entity_id: request_dict[self.entity_id]
}],
full_feature_names=self.full_feature_names).to_dict()
# get_online_features() returns a list of feature values per entity-id.
# Since we do this per entity, the list of feature values only contain
# a single element at position 0.
response_dict = {k: v[0] for k, v in feature_values.items()}
return request, beam.Row(**response_dict)

def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean the instantiated Feast feature store client."""
self.store = None

def get_cache_key(self, request: beam.Row) -> str:
"""Returns a string formatted with unique entity-id for the feature values.
"""
return 'entity_id: %s' % request._asdict()[self.entity_id]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Tests Feast feature store enrichment handler for enrichment transform.

See https://s.apache.org/feast-enrichment-test-setup
to set up test feast feature repository.
"""

import unittest

import pytest

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=ungrouped-imports
try:
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.feast_feature_store import \
FeastFeatureStoreEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store_it_test import ValidateResponse # pylint: disable=line-too-long
except ImportError:
raise unittest.SkipTest(
'Feast feature store test dependencies are not installed.')


@pytest.mark.uses_feast
class TestFeastEnrichmentHandler(unittest.TestCase):
def setUp(self) -> None:
self.feature_store_yaml_file = (
'gs://apache-beam-testing-enrichment/'
'feast-feature-store/repos/ecommerce/'
'feature_repo/feature_store.yaml')
self.feature_service_name = 'demograph_service'

def test_feast_enrichment(self):
requests = [
beam.Row(user_id=2, product_id=1),
beam.Row(user_id=6, product_id=2),
beam.Row(user_id=9, product_id=3),
]
expected_fields = [
'user_id', 'product_id', 'state', 'country', 'gender', 'age'
]
handler = FeastFeatureStoreEnrichmentHandler(
entity_id='user_id',
feature_store_yaml_path=self.feature_store_yaml_file,
feature_service_name=self.feature_service_name,
)

with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| beam.Create(requests)
| Enrichment(handler)
| beam.ParDo(ValidateResponse(expected_fields)))

def test_feast_enrichment_bad_feature_service_name(self):
"""Test raising an error when a bad feature service name is given."""
requests = [
beam.Row(user_id=1, product_id=1),
]
handler = FeastFeatureStoreEnrichmentHandler(
entity_id='user_id',
feature_store_yaml_path=self.feature_store_yaml_file,
feature_service_name="bad_name",
)

with self.assertRaises(RuntimeError):
test_pipeline = beam.Pipeline()
_ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
res = test_pipeline.run()
res.wait_until_finish()


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

try:
from apache_beam.transforms.enrichment_handlers.feast_feature_store import \
FeastFeatureStoreEnrichmentHandler
except ImportError:
raise unittest.SkipTest(
'Feast feature store test dependencies are not installed.')


class TestFeastFeatureStoreHandler(unittest.TestCase):
def setUp(self) -> None:
self.feature_store_yaml_file = (
'gs://apache-beam-testing-enrichment/'
'feast-feature-store/repos/ecommerce/'
'feature_repo/feature_store.yaml')
self.feature_service_name = 'demograph_service'

def test_feature_store_yaml_path_exists(self):
feature_store_yaml_path = 'gs://apache-beam-testing-enrichment/invalid.yaml'
with self.assertRaises(ValueError):
_ = FeastFeatureStoreEnrichmentHandler(
entity_id='user_id',
feature_store_yaml_path=feature_store_yaml_path,
feature_service_name=self.feature_service_name,
)

def test_feast_enrichment_no_feature_service(self):
"""Test raising an error in case of no feature service name."""
with self.assertRaises(ValueError):
_ = FeastFeatureStoreEnrichmentHandler(
entity_id='user_id',
feature_store_yaml_path=self.feature_store_yaml_file,
)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

feast[gcp]
1 change: 1 addition & 0 deletions sdks/python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ markers =
vertex_ai_postcommit: vertex ai postcommits that need additional deps.
uses_testcontainer: tests that use testcontainers.
uses_mock_api: tests that uses the mock API cluster.
uses_feast: tests that uses feast in some way

# Default timeout intended for unit tests.
# If certain tests need a different value, please see the docs on how to
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/scripts/generate_pydoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ autodoc_member_order = 'bysource'
autodoc_mock_imports = ["tensorrt", "cuda", "torch",
"onnxruntime", "onnx", "tensorflow", "tensorflow_hub",
"tensorflow_transform", "tensorflow_metadata", "transformers", "xgboost", "datatable", "transformers",
"sentence_transformers", "redis", "tensorflow_text",
"sentence_transformers", "redis", "tensorflow_text", "feast",
]

# Allow a special section for documenting DataFrame API
Expand Down
Loading
Loading