Skip to content

Commit

Permalink
update beam side docs for vertex ai enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Mar 20, 2024
1 parent bb0dac5 commit 502ed37
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,70 @@ def enrichment_with_bigtable():
| "Enrich W/ BigTable" >> Enrichment(bigtable_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_bigtable]


def enrichment_with_vertex_ai():
# [START enrichment_with_vertex_ai]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreEnrichmentHandler

project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(user_id='2963', product_id=14235, sale_price=15.0),
beam.Row(user_id='21422', product_id=11203, sale_price=12.0),
beam.Row(user_id='20592', product_id=8579, sale_price=9.0),
]

vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
feature_store_name="vertexai_enrichment_example",
feature_view_name="users",
row_key="user_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_vertex_ai]


def enrichment_with_vertex_ai_legacy():
# [START enrichment_with_vertex_ai_legacy]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreLegacyEnrichmentHandler

project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(entity_id="movie_01", title='The Shawshank Redemption'),
beam.Row(entity_id="movie_02", title="The Shining"),
beam.Row(entity_id="movie_04", title='The Dark Knight'),
]

vertex_ai_handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
entity_type_id='movies',
feature_store_id="movie_prediction_unique",
feature_ids=["title", "genres"],
row_key="entity_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_vertex_ai_legacy]
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

# pylint: disable=unused-import
try:
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable, \
enrichment_with_vertex_ai_legacy
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_vertex_ai
from apache_beam.io.requestresponse import RequestResponseIO
except ImportError:
raise unittest.SkipTest('RequestResponseIO dependencies are not installed')
Expand All @@ -40,6 +42,24 @@ def validate_enrichment_with_bigtable():
return expected


def validate_enrichment_with_vertex_ai():
expected = '''[START enrichment_with_vertex_ai]
Row(user_id='2963', product_id=14235, sale_price=15.0, age=29.0, gender='1', state='97', country='2')
Row(user_id='21422', product_id=11203, sale_price=12.0, age=36.0, state='184', gender='1', country='5')
Row(user_id='20592', product_id=8579, sale_price=9.0, age=30.0, state='86', gender='1', country='4')
[END enrichment_with_vertex_ai]'''.splitlines()[1:-1]
return expected


def validate_enrichment_with_vertex_ai_legacy():
expected = '''[START enrichment_with_vertex_ai_legacy]
Row(entity_id='movie_01', title='The Shawshank Redemption', genres='Drama')
Row(entity_id='movie_02', title='The Shining', genres='Horror')
Row(entity_id='movie_04', title='The Dark Knight', genres='Action')
[END enrichment_with_vertex_ai_legacy]'''.splitlines()[1:-1]
return expected


@mock.patch('sys.stdout', new_callable=StringIO)
class EnrichmentTest(unittest.TestCase):
def test_enrichment_with_bigtable(self, mock_stdout):
Expand All @@ -48,6 +68,21 @@ def test_enrichment_with_bigtable(self, mock_stdout):
expected = validate_enrichment_with_bigtable()
self.assertEqual(output, expected)

def test_enrichment_with_vertex_ai(self, mock_stdout):
enrichment_with_vertex_ai()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_vertex_ai()

for i in range(len(expected)):
self.assertEqual(set(output[i].split(',')), set(expected[i].split(',')))

def test_enrichment_with_vertex_ai_legacy(self, mock_stdout):
enrichment_with_vertex_ai_legacy()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_vertex_ai_legacy()
self.maxDiff = None
self.assertEqual(output, expected)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
-->

# Enrichment transform
# Use Bigtable to enrich data

{{< localstorage language language-py >}}

Expand All @@ -29,21 +29,20 @@ limitations under the License.
</tr>
</table>


The following example demonstrates how to create a pipeline that does data enrichment with Cloud Bigtable.

## Example: BigTableEnrichmentHandler
In Apache Beam 2.54.0 and later versions, the enrichment transform includes a built-in enrichment handler for [Bigtable](https://cloud.google.com/bigtable/docs/overview).
The following example demonstrates how to create a pipeline that use the enrichment transform with `BigTableEnrichmentHandler`.

The data stored in the Bigtable cluster uses the following format:

{{ table }}
| Row key | product:product_id | product:product_name | product:product_stock |
|:---------:|:--------------------:|:----------------------:|:-----------------------:|
| 1 | 1 | pixel 5 | 2 |
| 2 | 2 | pixel 6 | 4 |
| 3 | 3 | pixel 7 | 20 |
| 4 | 4 | pixel 8 | 10 |
{{ /table }}
{{< table >}}
| Row key | product:product_id | product:product_name | product:product_stock |
|:-----------:|:--------------------:|:----------------------:|:-----------------------:|
| 1 | 1 | pixel 5 | 2 |
| 2 | 2 | pixel 6 | 4 |
| 3 | 3 | pixel 7 | 20 |
| 4 | 4 | pixel 8 | 10 |
{{< /table >}}


{{< highlight language="py" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_bigtable >}}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
title: "Enrichment"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Enrichment with Google Cloud Vertex AI Feature Store

{{< localstorage language language-py >}}

<table>
<tr>
<td>
<a>
{{< button-pydoc path="apache_beam.transforms" class="Enrichment" >}}
</a>
</td>
</tr>
</table>


In Apache Beam 2.55.0 and later versions, the enrichment transform includes a built-in enrichment handler for [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore).
The following example demonstrates how to create a pipeline that use the enrichment transform with `VertexAIFeatureStoreEnrichmentHandler` and `VertexAIFeatureStoreLegacyEnrichmentHandler`.

## Example 1: Enrichment with Vertex AI Feature Store

The precomputed feature values stored in Vertex AI Feature Store uses the following format.

{{< table >}}
| user_id | age | gender | state | country |
|:--------:|:----:|:------:|:-----:|:-------:|
| 21422 | 12 | 0 | 0 | 0 |
| 2963 | 12 | 1 | 1 | 1 |
| 20592 | 12 | 1 | 2 | 2 |
| 76538 | 12 | 1 | 3 | 0 |
{{< /table >}}


{{< highlight language="py" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_vertex_ai >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_vertex_ai >}}
{{< /highlight >}}

## Example 2: Enrichment with Vertex AI Feature Store (Legacy)

The precomputed feature values stored in Vertex AI Feature Store (Legacy) uses the following format:

{{< table >}}
| entity_id | title | genres |
|:---------:|:------------------------:|:------:|
| movie_01 | The Shawshank Redemption | Drama |
| movie_02 | The Shining | Horror |
| movie_04 | The Dark Knight | Action |
{{< /table >}}

{{< highlight language="py" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_vertex_ai_legacy >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_vertex_ai_legacy >}}
{{< /highlight >}}


## Related transforms

Not applicable.

{{< button-pydoc path="apache_beam.transforms" class="Enrichment" >}}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ limitations under the License.


The enrichment transform lets you dynamically enrich data in a pipeline by doing a key-value lookup to a remote service. The transform uses [`RequestResponeIO`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.requestresponseio.html#apache_beam.io.requestresponseio.RequestResponseIO) internally. This feature uses client-side throttling to ensure that the remote service isn't overloaded with requests. If service-side errors occur, like `TooManyRequests` and `Timeout` exceptions, it retries the requests by using exponential backoff.

This transform is available in Apache Beam 2.54.0 and later versions.

## Examples

The following examples demonstrates how to create a pipeline that use the enrichment transform to enrich data from external services.

{{< table >}}
| Service | Beam version | Example |
|:----------------------------------------------|:-------------|:--------------------------------------------|
| Google Cloud Bigtable | \>= 2.54.0 | Bigtable Enrichment |
| Google Cloud Vertex AI Feature Store | \>= 2.55.0 | Vertex AI Feature Store Enrichment |
| Google Cloud Vertex AI Feature Store (Legacy) | \>= 2.55.0 | Vertex AI Feature Store (Legacy) Enrichment |
| Service | Example |
|:----------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Google Cloud Bigtable | [Bigtable Enrichment](/documentation/transforms/python/elementwise/enrichment-bigtable/#example) |
| Google Cloud Vertex AI Feature Store | [Vertex AI Feature Store Enrichment](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-1-enrichment-with-vertex-ai-feature-store) |
| Google Cloud Vertex AI Feature Store (Legacy) | [Vertex AI Feature Store (Legacy) Enrichment](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) |
{{< /table >}}

## Related transforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,14 @@
<span class="section-nav-list-title">Element-wise</span>

<ul class="section-nav-list">
<li><a href="/documentation/transforms/python/elementwise/enrichment/">Enrichment</a></li>
<li class="section-nav-item--collapsible">
<span class="section-nav-list-title">Enrichment</span>
<ul class="section-nav-list">
<li><a href="/documentation/transforms/python/elementwise/enrichment/">Overview</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-bigtable/">Bigtable example</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-vertexai/">Vertex AI Feature Store examples</a></li>
</ul>
</li>
<li><a href="/documentation/transforms/python/elementwise/filter/">Filter</a></li>
<li><a href="/documentation/transforms/python/elementwise/flatmap/">FlatMap</a></li>
<li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
Expand Down

0 comments on commit 502ed37

Please sign in to comment.