From 29ee274cb48a599d324870754b25994f815f9bc0 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 12 Apr 2024 15:45:41 -0400 Subject: [PATCH 1/6] add feast feature store handler --- .../trigger_files/beam_PostCommit_Python.json | 3 - .../feast_feature_store.py | 155 ++++++++++++++++++ .../feast_feature_store_it_test.py | 117 +++++++++++++ .../feast_tests_requirements.txt | 18 ++ sdks/python/pytest.ini | 1 + sdks/python/test-suites/direct/common.gradle | 28 ++++ 6 files changed, 319 insertions(+), 3 deletions(-) delete mode 100644 .github/trigger_files/beam_PostCommit_Python.json create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json deleted file mode 100644 index c4edaa85a89d..000000000000 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "comment": "Modify this file in a trivial way to cause this test suite to run" -} diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py new file mode 100644 index 000000000000..7e227da4d82d --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import tempfile +from pathlib import Path +from typing import List + +from feast import FeatureStore + +import apache_beam as beam +from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem +from apache_beam.transforms.enrichment import EnrichmentSourceHandler +from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel + +__all__ = [ + 'FeastFeatureStoreEnrichmentHandler', +] + +_LOGGER = logging.getLogger(__name__) + +LOCAL_FEATURE_STORE_YAML_FILENAME = 'fs_yaml_file.yaml' + + +def download_fs_yaml_file(gcs_fs_yaml_file: str): + """Download the feature store config file for Feast.""" + try: + fs = GCSFileSystem(pipeline_options={}) + with fs.open(gcs_fs_yaml_file, 'r') as gcs_file: + with tempfile.NamedTemporaryFile(suffix=LOCAL_FEATURE_STORE_YAML_FILENAME, + delete=False) as local_file: + local_file.write(gcs_file.read()) + return Path(local_file.name) + except Exception: + raise RuntimeError( + 'error downloading the file %s locally to load the ' + 'Feast feature store.') + + +def _validate_feature_names(feature_names, feature_service_name): + """Validate either `feature_names` or `feature_service_name` is provided.""" + if not bool(feature_names or feature_service_name): + raise ValueError( + 'Please provide either a list of feature names to fetch ' + 'from online store or a feature service name for the ' + 'online store!') + + +class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, + beam.Row]): + """Enrichment handler to interact with the Feast feature store. + + Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` + transform. + + To filter the features to enrich, use the `join_fn` param in + :class:`apache_beam.transforms.enrichment.Enrichment`. + """ + def __init__( + self, + entity_id: str, + feature_store_yaml_path: str, + feature_names: List[str] = None, + feature_service_name: str = "", + full_feature_names: bool = False, + *, + exception_level: ExceptionLevel = ExceptionLevel.WARN, + ): + """Initializes an instance of `FeastFeatureStoreEnrichmentHandler`. + + Args: + entity_id (str): entity name for the entity associated with the features. + feature_store_yaml_path (str): The path to a YAML configuration file for + the Feast feature store. + feature_names: A list of feature names to be retrieved from the online + Feast feature store. The `feature_names` will be ignored if + `feature_service_name` is also provided. + feature_service_name (str): The name of the feature service containing + the features to fetch from the online Feast feature store. + full_feature_names (bool): Whether to use full feature names + (including namespaces, etc.). Defaults to False. + exception_level: a `enum.Enum` value from + `apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel` + to set the level when `None` feature values are fetched from the + online Feast store. Defaults to `ExceptionLevel.WARN`. + """ + self.entity_id = entity_id + self.feature_store_yaml_path = feature_store_yaml_path + self.feature_names = feature_names + self.feature_service_name = feature_service_name + self.full_feature_names = full_feature_names + self._exception_level = exception_level + _validate_feature_names(self.feature_names, self.feature_service_name) + + def __enter__(self): + """Connect with the Feast Feature Store.""" + local_repo_path = download_fs_yaml_file(self.feature_store_yaml_path) + try: + self.store = FeatureStore(fs_yaml_file=local_repo_path) + except Exception: + raise RuntimeError( + 'Invalid feature store yaml file provided. Make sure ' + 'the `feature_store_yaml_path` contains the valid ' + 'configuration for Feast feature store.') + if self.feature_service_name: + try: + self.features = self.store.get_feature_service( + self.feature_service_name) + except Exception: + raise RuntimeError( + 'Could find the feature service %s for the feature ' + 'store configured in `feature_store_yaml_path`.') + else: + self.features = self.feature_names + + def __call__(self, request: beam.Row, *args, **kwargs): + """Fetches feature values for an entity-id from the Feast feature store. + + Args: + request: the input `beam.Row` to enrich. + """ + request_dict = request._asdict() + feature_values = self.store.get_online_features( + features=self.features, + entity_rows=[{ + self.entity_id: request_dict[self.entity_id] + }], + full_feature_names=self.full_feature_names).to_dict() + # get_online_features() returns a list of feature values per entity-id. + # Since we do this per entity, the list of feature values only contain + # a single element at position 0. + response_dict = {k: v[0] for k, v in feature_values.items()} + return request, beam.Row(**response_dict) + + def __exit__(self, exc_type, exc_val, exc_tb): + """Clean the instantiated Feast feature store client.""" + self.store = None + + def get_cache_key(self, request: beam.Row) -> str: + """Returns a string formatted with unique entity-id for the feature values. + """ + return 'entity_id: %s' % request._asdict()[self.entity_id] diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py new file mode 100644 index 000000000000..70f7b90a91e0 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests Feast feature store enrichment handler for enrichment transform. + +See https://s.apache.org/feast-enrichment-test-setup +to set up test feast feature repository. +""" + +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=ungrouped-imports +try: + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.feast_feature_store import \ + FeastFeatureStoreEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store_it_test import ValidateResponse # pylint: disable=line-too-long +except ImportError: + raise unittest.SkipTest( + 'Feast feature store test dependencies are not installed.') + + +@pytest.mark.uses_feast +class TestFeastEnrichmentHandler(unittest.TestCase): + def setUp(self) -> None: + self.feature_store_yaml_file = ( + 'gs://apache-beam-testing-enrichment/' + 'feast-feature-store/repos/ecommerce/' + 'feature_repo/feature_store.yaml') + self.feature_service_name = 'demograph_service' + + def test_feast_enrichment(self): + requests = [ + beam.Row(user_id=2, product_id=1), + beam.Row(user_id=6, product_id=2), + beam.Row(user_id=9, product_id=3), + ] + expected_fields = [ + 'user_id', 'product_id', 'state', 'country', 'gender', 'age' + ] + handler = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path=self.feature_store_yaml_file, + feature_service_name=self.feature_service_name, + ) + + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler) + | beam.ParDo(ValidateResponse(expected_fields))) + + def test_feast_enrichment_bad_feature_service_name(self): + """Test raising an error when a bad feature service name is given.""" + requests = [ + beam.Row(user_id=1, product_id=1), + ] + handler = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path=self.feature_store_yaml_file, + feature_service_name="bad_name", + ) + + with self.assertRaises(RuntimeError): + test_pipeline = beam.Pipeline() + _ = (test_pipeline | beam.Create(requests) | Enrichment(handler)) + res = test_pipeline.run() + res.wait_until_finish() + + def test_feast_enrichment_bad_yaml_path(self): + """Test raising an error when wrong yaml file is passed.""" + requests = [ + beam.Row(user_id=1, product_id=1), + ] + + with self.assertRaises(RuntimeError): + handler = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path='gs://bad_path', + feature_service_name="bad_name", + ) + test_pipeline = beam.Pipeline() + _ = (test_pipeline | beam.Create(requests) | Enrichment(handler)) + res = test_pipeline.run() + res.wait_until_finish() + + def test_feast_enrichment_no_feature_service(self): + """Test raising an error in case of no feature service name.""" + with self.assertRaises(ValueError): + _ = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path=self.feature_store_yaml_file, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt new file mode 100644 index 000000000000..3e0c1f50bd75 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +feast[gcp] \ No newline at end of file diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index 8df74adbc62e..b10acaac71cd 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -67,6 +67,7 @@ markers = vertex_ai_postcommit: vertex ai postcommits that need additional deps. uses_testcontainer: tests that use testcontainers. uses_mock_api: tests that uses the mock API cluster. + uses_feast: tests that uses feast in some way # Default timeout intended for unit tests. # If certain tests need a different value, please see the docs on how to diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index 1851a4d9cd0b..c79c5f66abbc 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -392,6 +392,33 @@ task testcontainersTest { } } +// Integration tests that uses feast +task feastIntegrationTest { + dependsOn 'installGcpTest' + dependsOn ':sdks:python:sdist' + def requirementsFile = "${rootDir}/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt" + doFirst { + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pip install -r $requirementsFile" + } + } + doLast { + def testOpts = basicTestOpts + def argMap = [ + "test_opts": testOpts, + "suite": "postCommitIT-direct-py${pythonVersionSuffix}", + "collect": "uses_feast", + "runner": "TestDirectRunner" + ] + def cmdArgs = mapToArgString(argMap) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs" + } + } +} + // Add all the RunInference framework IT tests to this gradle task that runs on Direct Runner Post commit suite. project.tasks.register("inferencePostCommitIT") { dependsOn = [ @@ -401,6 +428,7 @@ project.tasks.register("inferencePostCommitIT") { 'xgboostInferenceTest', 'transformersInferenceTest', 'testcontainersTest', + 'feastIntegrationTest', // (TODO) https://github.com/apache/beam/issues/25799 // uncomment tfx bsl tests once tfx supports protobuf 4.x // 'tfxInferenceTest', From 8279461db3c0fc1f93aafd41a7e33de482ee5efb Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Sun, 14 Apr 2024 22:41:58 -0400 Subject: [PATCH 2/6] add changes, unit test --- CHANGES.md | 2 +- .../feast_feature_store.py | 19 +++++-- .../feast_feature_store_it_test.py | 8 --- .../feast_feature_store_test.py | 54 +++++++++++++++++++ sdks/python/scripts/generate_pydoc.sh | 2 +- 5 files changed, 70 insertions(+), 15 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py diff --git a/CHANGES.md b/CHANGES.md index 941ba23a7573..d4a3120b7612 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added Feast feature store handler for enrichment transform (Python) ([#30957](https://github.com/apache/beam/issues/30964)). ## Breaking Changes diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py index 7e227da4d82d..8405360ab0c3 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -18,13 +18,13 @@ import tempfile from pathlib import Path from typing import List - -from feast import FeatureStore +from typing import Optional import apache_beam as beam from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem from apache_beam.transforms.enrichment import EnrichmentSourceHandler from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel +from feast import FeatureStore __all__ = [ 'FeastFeatureStoreEnrichmentHandler', @@ -59,6 +59,14 @@ def _validate_feature_names(feature_names, feature_service_name): 'online store!') +def _validate_feature_store_yaml_path_exists(fs_yaml_file): + """Check if the feature store yaml path exists.""" + fs = GCSFileSystem(pipeline_options={}) + if not fs.exists(fs_yaml_file): + raise ValueError( + 'The feature store yaml path (%s) does not exist.' % fs_yaml_file) + + class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): """Enrichment handler to interact with the Feast feature store. @@ -73,9 +81,9 @@ def __init__( self, entity_id: str, feature_store_yaml_path: str, - feature_names: List[str] = None, - feature_service_name: str = "", - full_feature_names: bool = False, + feature_names: Optional[List[str]] = None, + feature_service_name: Optional[str] = "", + full_feature_names: Optional[bool] = False, *, exception_level: ExceptionLevel = ExceptionLevel.WARN, ): @@ -103,6 +111,7 @@ def __init__( self.feature_service_name = feature_service_name self.full_feature_names = full_feature_names self._exception_level = exception_level + _validate_feature_store_yaml_path_exists(self.feature_store_yaml_path) _validate_feature_names(self.feature_names, self.feature_service_name) def __enter__(self): diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py index 70f7b90a91e0..60b0818a08af 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py @@ -104,14 +104,6 @@ def test_feast_enrichment_bad_yaml_path(self): res = test_pipeline.run() res.wait_until_finish() - def test_feast_enrichment_no_feature_service(self): - """Test raising an error in case of no feature service name.""" - with self.assertRaises(ValueError): - _ = FeastFeatureStoreEnrichmentHandler( - entity_id='user_id', - feature_store_yaml_path=self.feature_store_yaml_file, - ) - if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py new file mode 100644 index 000000000000..4418d1bf5449 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +try: + from apache_beam.transforms.enrichment_handlers.feast_feature_store import \ + FeastFeatureStoreEnrichmentHandler +except ImportError: + raise unittest.SkipTest( + 'Feast feature store test dependencies are not installed.') + + +class TestFeastFeatureStoreHandler(unittest.TestCase): + def setUp(self) -> None: + self.feature_store_yaml_file = ( + 'gs://apache-beam-testing-enrichment/' + 'feast-feature-store/repos/ecommerce/' + 'feature_repo/feature_store.yaml') + self.feature_service_name = 'demograph_service' + + def test_feature_store_yaml_path_exists(self): + feature_store_yaml_path = 'gs://apache-beam-testing-enrichment/invalid.yaml' + with self.assertRaises(ValueError): + _ = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path=feature_store_yaml_path, + feature_service_name=self.feature_service_name, + ) + + def test_feast_enrichment_no_feature_service(self): + """Test raising an error in case of no feature service name.""" + with self.assertRaises(ValueError): + _ = FeastFeatureStoreEnrichmentHandler( + entity_id='user_id', + feature_store_yaml_path=self.feature_store_yaml_file, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index 1e25f54dc462..183aa746957c 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -134,7 +134,7 @@ autodoc_member_order = 'bysource' autodoc_mock_imports = ["tensorrt", "cuda", "torch", "onnxruntime", "onnx", "tensorflow", "tensorflow_hub", "tensorflow_transform", "tensorflow_metadata", "transformers", "xgboost", "datatable", "transformers", - "sentence_transformers", "redis", "tensorflow_text", + "sentence_transformers", "redis", "tensorflow_text", "feast", ] # Allow a special section for documenting DataFrame API From 7be053fcad44d58e980e240c69dbac8832cb0872 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 15 Apr 2024 09:25:40 -0400 Subject: [PATCH 3/6] remove duplicate test, add doc --- .../enrichment_handlers/feast_feature_store.py | 1 + .../feast_feature_store_it_test.py | 17 ----------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py index 8405360ab0c3..0b99bf5e08f4 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -91,6 +91,7 @@ def __init__( Args: entity_id (str): entity name for the entity associated with the features. + The `entity_id` is used to extract the entity value from the input row. feature_store_yaml_path (str): The path to a YAML configuration file for the Feast feature store. feature_names: A list of feature names to be retrieved from the online diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py index 60b0818a08af..1f1623359113 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py @@ -87,23 +87,6 @@ def test_feast_enrichment_bad_feature_service_name(self): res = test_pipeline.run() res.wait_until_finish() - def test_feast_enrichment_bad_yaml_path(self): - """Test raising an error when wrong yaml file is passed.""" - requests = [ - beam.Row(user_id=1, product_id=1), - ] - - with self.assertRaises(RuntimeError): - handler = FeastFeatureStoreEnrichmentHandler( - entity_id='user_id', - feature_store_yaml_path='gs://bad_path', - feature_service_name="bad_name", - ) - test_pipeline = beam.Pipeline() - _ = (test_pipeline | beam.Create(requests) | Enrichment(handler)) - res = test_pipeline.run() - res.wait_until_finish() - if __name__ == '__main__': unittest.main() From 5db3e22019f47992f53e2598551a8e7201c9c425 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 16 Apr 2024 10:06:36 -0400 Subject: [PATCH 4/6] correct string formatting --- .../trigger_files/beam_PostCommit_Python.json | 4 +++ .../feast_feature_store.py | 30 ++++++++++--------- 2 files changed, 20 insertions(+), 14 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python.json diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json new file mode 100644 index 000000000000..63bd5651def0 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run" +} + diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py index 0b99bf5e08f4..8adf06ae96a6 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -47,16 +47,17 @@ def download_fs_yaml_file(gcs_fs_yaml_file: str): except Exception: raise RuntimeError( 'error downloading the file %s locally to load the ' - 'Feast feature store.') + 'Feast feature store.' % gcs_fs_yaml_file) def _validate_feature_names(feature_names, feature_service_name): - """Validate either `feature_names` or `feature_service_name` is provided.""" - if not bool(feature_names or feature_service_name): + """Check if one of `feature_names` or `feature_service_name` is provided.""" + if ((not feature_names and not feature_service_name) or + bool(feature_names and feature_service_name)): raise ValueError( 'Please provide either a list of feature names to fetch ' 'from online store or a feature service name for the ' - 'online store!') + 'Feast online feature store!') def _validate_feature_store_yaml_path_exists(fs_yaml_file): @@ -71,10 +72,11 @@ class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): """Enrichment handler to interact with the Feast feature store. - Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` - transform. + To specify the features to fetch from Feast online store, + please specify exactly one of `feature_names` or `feature_service_name`. - To filter the features to enrich, use the `join_fn` param in + Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` + transform. To filter the features to enrich, use the `join_fn` param in :class:`apache_beam.transforms.enrichment.Enrichment`. """ def __init__( @@ -95,8 +97,7 @@ def __init__( feature_store_yaml_path (str): The path to a YAML configuration file for the Feast feature store. feature_names: A list of feature names to be retrieved from the online - Feast feature store. The `feature_names` will be ignored if - `feature_service_name` is also provided. + Feast feature store. feature_service_name (str): The name of the feature service containing the features to fetch from the online Feast feature store. full_feature_names (bool): Whether to use full feature names @@ -116,23 +117,24 @@ def __init__( _validate_feature_names(self.feature_names, self.feature_service_name) def __enter__(self): - """Connect with the Feast Feature Store.""" + """Connect with the Feast feature store.""" local_repo_path = download_fs_yaml_file(self.feature_store_yaml_path) try: self.store = FeatureStore(fs_yaml_file=local_repo_path) except Exception: raise RuntimeError( 'Invalid feature store yaml file provided. Make sure ' - 'the `feature_store_yaml_path` contains the valid ' - 'configuration for Feast feature store.') + 'the %s contains the valid configuration for Feast feature store.' % + self.feature_store_yaml_path) if self.feature_service_name: try: self.features = self.store.get_feature_service( self.feature_service_name) except Exception: raise RuntimeError( - 'Could find the feature service %s for the feature ' - 'store configured in `feature_store_yaml_path`.') + 'Could not find the feature service %s for the feature ' + 'store configured in %s.' % + (self.feature_service_name, self.feature_store_yaml_path)) else: self.features = self.feature_names From 86fbf15502bd8bfc5015182f83bf1b193701ab4d Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 24 Apr 2024 17:21:07 -0400 Subject: [PATCH 5/6] add lambda, use filesystems, start test --- .../trigger_files/beam_PostCommit_Python.json | 1 - .../feast_feature_store.py | 61 ++++++++++++++----- .../feast_feature_store_it_test.py | 29 +++++++++ .../feast_feature_store_test.py | 13 ++++ 4 files changed, 88 insertions(+), 16 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 63bd5651def0..c4edaa85a89d 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,4 +1,3 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run" } - diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py index 8adf06ae96a6..f1eec2f24417 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -17,11 +17,14 @@ import logging import tempfile from pathlib import Path +from typing import Any +from typing import Callable from typing import List +from typing import Mapping from typing import Optional import apache_beam as beam -from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem +from apache_beam.io.filesystems import FileSystems from apache_beam.transforms.enrichment import EnrichmentSourceHandler from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel from feast import FeatureStore @@ -30,6 +33,8 @@ 'FeastFeatureStoreEnrichmentHandler', ] +EntityRowFn = Callable[[beam.Row], Mapping[str, Any]] + _LOGGER = logging.getLogger(__name__) LOCAL_FEATURE_STORE_YAML_FILENAME = 'fs_yaml_file.yaml' @@ -38,8 +43,7 @@ def download_fs_yaml_file(gcs_fs_yaml_file: str): """Download the feature store config file for Feast.""" try: - fs = GCSFileSystem(pipeline_options={}) - with fs.open(gcs_fs_yaml_file, 'r') as gcs_file: + with FileSystems.open(gcs_fs_yaml_file, 'r') as gcs_file: with tempfile.NamedTemporaryFile(suffix=LOCAL_FEATURE_STORE_YAML_FILENAME, delete=False) as local_file: local_file.write(gcs_file.read()) @@ -55,19 +59,28 @@ def _validate_feature_names(feature_names, feature_service_name): if ((not feature_names and not feature_service_name) or bool(feature_names and feature_service_name)): raise ValueError( - 'Please provide either a list of feature names to fetch ' - 'from online store or a feature service name for the ' - 'Feast online feature store!') + 'Please provide exactly one of a list of feature names to fetch ' + 'from online store (`feature_names`) or a feature service name for ' + 'the Feast online feature store (`feature_service_name`).') def _validate_feature_store_yaml_path_exists(fs_yaml_file): """Check if the feature store yaml path exists.""" - fs = GCSFileSystem(pipeline_options={}) - if not fs.exists(fs_yaml_file): + if not FileSystems.exists(fs_yaml_file): raise ValueError( 'The feature store yaml path (%s) does not exist.' % fs_yaml_file) +def _validate_entity_key_exists(entity_id, entity_row_fn): + """Checks if the entity key or a lambda to build entity key exists.""" + if ((not entity_row_fn and not entity_id) or + bool(entity_row_fn and entity_id)): + raise ValueError( + "Please specify exactly one of a `entity_id` or a lambda " + "function with `entity_row_fn` to extract the entity id " + "from the input row.") + + class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): """Enrichment handler to interact with the Feast feature store. @@ -81,12 +94,13 @@ class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, """ def __init__( self, - entity_id: str, feature_store_yaml_path: str, feature_names: Optional[List[str]] = None, feature_service_name: Optional[str] = "", full_feature_names: Optional[bool] = False, + entity_id: str = "", *, + entity_row_fn: Optional[EntityRowFn] = None, exception_level: ExceptionLevel = ExceptionLevel.WARN, ): """Initializes an instance of `FeastFeatureStoreEnrichmentHandler`. @@ -95,13 +109,21 @@ def __init__( entity_id (str): entity name for the entity associated with the features. The `entity_id` is used to extract the entity value from the input row. feature_store_yaml_path (str): The path to a YAML configuration file for - the Feast feature store. + the Feast feature store. See + https://docs.feast.dev/reference/feature-repository/feature-store-yaml + for configuration options supported by Feast. feature_names: A list of feature names to be retrieved from the online Feast feature store. feature_service_name (str): The name of the feature service containing the features to fetch from the online Feast feature store. full_feature_names (bool): Whether to use full feature names (including namespaces, etc.). Defaults to False. + entity_row_fn: a lambda function that returns a dictionary with + a mapping from the entity key column name to entity key value from the + input row. It is used to build/extract the entity dict for feature + retrieval. + See https://docs.feast.dev/getting-started/concepts/feature-retrieval + for more information. exception_level: a `enum.Enum` value from `apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel` to set the level when `None` feature values are fetched from the @@ -112,7 +134,9 @@ def __init__( self.feature_names = feature_names self.feature_service_name = feature_service_name self.full_feature_names = full_feature_names + self.entity_row_fn = entity_row_fn self._exception_level = exception_level + _validate_entity_key_exists(self.entity_id, self.entity_row_fn) _validate_feature_store_yaml_path_exists(self.feature_store_yaml_path) _validate_feature_names(self.feature_names, self.feature_service_name) @@ -144,12 +168,14 @@ def __call__(self, request: beam.Row, *args, **kwargs): Args: request: the input `beam.Row` to enrich. """ - request_dict = request._asdict() + if self.entity_row_fn: + entity_dict = self.entity_row_fn(request) + else: + request_dict = request._asdict() + entity_dict = {self.entity_id: request_dict[self.entity_id]} feature_values = self.store.get_online_features( features=self.features, - entity_rows=[{ - self.entity_id: request_dict[self.entity_id] - }], + entity_rows=[entity_dict], full_feature_names=self.full_feature_names).to_dict() # get_online_features() returns a list of feature values per entity-id. # Since we do this per entity, the list of feature values only contain @@ -164,4 +190,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): def get_cache_key(self, request: beam.Row) -> str: """Returns a string formatted with unique entity-id for the feature values. """ - return 'entity_id: %s' % request._asdict()[self.entity_id] + if self.entity_row_fn: + entity_dict = self.entity_row_fn(request) + entity_id = list(entity_dict.keys())[0] + else: + entity_id = self.entity_id + return 'entity_id: %s' % request._asdict()[entity_id] diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py index 1f1623359113..89cb39c2c19c 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py @@ -22,6 +22,8 @@ """ import unittest +from typing import Any +from typing import Mapping import pytest @@ -39,6 +41,11 @@ 'Feast feature store test dependencies are not installed.') +def _entity_row_fn(request: beam.Row) -> Mapping[str, Any]: + entity_value = request.user_id # type: ignore[attr-defined] + return {'user_id': entity_value} + + @pytest.mark.uses_feast class TestFeastEnrichmentHandler(unittest.TestCase): def setUp(self) -> None: @@ -87,6 +94,28 @@ def test_feast_enrichment_bad_feature_service_name(self): res = test_pipeline.run() res.wait_until_finish() + def test_feast_enrichment_with_lambda(self): + requests = [ + beam.Row(user_id=2, product_id=1), + beam.Row(user_id=6, product_id=2), + beam.Row(user_id=9, product_id=3), + ] + expected_fields = [ + 'user_id', 'product_id', 'state', 'country', 'gender', 'age' + ] + handler = FeastFeatureStoreEnrichmentHandler( + feature_store_yaml_path=self.feature_store_yaml_file, + feature_service_name=self.feature_service_name, + entity_row_fn=_entity_row_fn, + ) + + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler) + | beam.ParDo(ValidateResponse(expected_fields))) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py index 4418d1bf5449..764086ab2c98 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py @@ -16,9 +16,13 @@ # import unittest +from parameterized import parameterized + try: from apache_beam.transforms.enrichment_handlers.feast_feature_store import \ FeastFeatureStoreEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.feast_feature_store_it_test \ + import _entity_row_fn except ImportError: raise unittest.SkipTest( 'Feast feature store test dependencies are not installed.') @@ -49,6 +53,15 @@ def test_feast_enrichment_no_feature_service(self): feature_store_yaml_path=self.feature_store_yaml_file, ) + @parameterized.expand([('user_id', _entity_row_fn), ('', None)]) + def test_feast_enrichment_invalid_args(self, entity_id, entity_row_fn): + with self.assertRaises(ValueError): + _ = FeastFeatureStoreEnrichmentHandler( + feature_store_yaml_path=self.feature_store_yaml_file, + entity_id=entity_id, + entity_row_fn=entity_row_fn, + ) + if __name__ == '__main__': unittest.main() From 42b698e52808e24ca7cc5479b8c12c2c9a245540 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 25 Apr 2024 11:54:19 -0400 Subject: [PATCH 6/6] update pydoc --- .../enrichment_handlers/feast_feature_store.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py index f1eec2f24417..dc2a71786f65 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py @@ -106,8 +106,6 @@ def __init__( """Initializes an instance of `FeastFeatureStoreEnrichmentHandler`. Args: - entity_id (str): entity name for the entity associated with the features. - The `entity_id` is used to extract the entity value from the input row. feature_store_yaml_path (str): The path to a YAML configuration file for the Feast feature store. See https://docs.feast.dev/reference/feature-repository/feature-store-yaml @@ -118,10 +116,14 @@ def __init__( the features to fetch from the online Feast feature store. full_feature_names (bool): Whether to use full feature names (including namespaces, etc.). Defaults to False. - entity_row_fn: a lambda function that returns a dictionary with - a mapping from the entity key column name to entity key value from the - input row. It is used to build/extract the entity dict for feature - retrieval. + entity_id (str): entity name for the entity associated with the features. + The `entity_id` is used to extract the entity value from the input row. + Please provide exactly one of `entity_id` or `entity_row_fn`. + entity_row_fn: a lambda function that takes an input `beam.Row` and + returns a dictionary with a mapping from the entity key column name to + entity key value. It is used to build/extract the entity dict for + feature retrieval. Please provide exactly one of `entity_id` or + `entity_row_fn`. See https://docs.feast.dev/getting-started/concepts/feature-retrieval for more information. exception_level: a `enum.Enum` value from