Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yaml] Add enrichment transform to Beam YAML #32286

Merged
merged 11 commits into from
Oct 17, 2024
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'):
dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex)
bigquery_client.get_or_create_dataset(project, dataset_id)
logging.info("Created dataset %s in project %s", dataset_id, project)
yield f'{project}:{dataset_id}.tmp_table'
yield f'{project}.{dataset_id}.tmp_table'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this change needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this is reused by EnrichmentTransform's BQ handler which uses the non-legacy form and BigQueryIO accepts both, so this should be safe

request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=project, datasetId=dataset_id, deleteContents=True)
logging.info("Deleting dataset %s in project %s", dataset_id, project)
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/yaml/standard_providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@
Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1"
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

- type: 'python'
config: {}
transforms:
Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform'
84 changes: 84 additions & 0 deletions sdks/python/apache_beam/yaml/tests/enrichment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: BQ_TABLE
type: "apache_beam.yaml.integration_tests.temp_bigquery_table"
config:
project: "apache-beam-testing"
- name: TEMP_DIR
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
config:
bucket: "gs://temp-storage-for-end-to-end-tests/temp-it"

pipelines:
- pipeline:
type: chain
transforms:
- type: Create
name: Rows
config:
elements:
- {label: '11a', rank: 0}
- {label: '37a', rank: 1}
- {label: '389a', rank: 2}

- type: WriteToBigQuery
config:
table: "{BQ_TABLE}"

- pipeline:
type: chain
transforms:
- type: Create
name: Data
config:
elements:
- {label: '11a', name: 'S1'}
- {label: '37a', name: 'S2'}
- {label: '389a', name: 'S3'}
- type: Enrichment
name: Enriched
config:
enrichment_handler: 'BigQuery'
handler_config:
project: apache-beam-testing
table_name: "{BQ_TABLE}"
fields: ['label']
row_restriction_template: "label = '37a'"
timeout: 30

- type: MapToFields
config:
language: python
fields:
label:
callable: 'lambda x: x.label'
output_type: string
rank:
callable: 'lambda x: x.rank'
output_type: integer
name:
callable: 'lambda x: x.name'
output_type: string

- type: AssertEqual
config:
elements:
- {label: '37a', rank: 1, name: 'S2'}
options:
yaml_experimental_features: [ 'Enrichment' ]
126 changes: 126 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Any
from typing import Dict
from typing import Optional

import apache_beam as beam
from apache_beam.yaml import options

try:
from apache_beam.transforms.enrichment import Enrichment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps not to fix in this PR (but if not let's file a bug). It seems that Enrichment itself should not require GCP, only specific handlers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I think the only class imported is a custom Exception class

from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler
except ImportError:
Enrichment = None # type: ignore
BigQueryEnrichmentHandler = None # type: ignore
BigTableEnrichmentHandler = None # type: ignore
VertexAIFeatureStoreEnrichmentHandler = None # type: ignore

try:
from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler
except ImportError:
FeastFeatureStoreEnrichmentHandler = None # type: ignore


@beam.ptransform.ptransform_fn
def enrichment_transform(
pcoll,
enrichment_handler: str,
handler_config: Dict[str, Any],
timeout: Optional[float] = 30):
"""
The Enrichment transform allows you to dynamically
enhance elements in a pipeline by performing key-value
lookups against external services like APIs or databases.

Args:
enrichment_handler: Specifies the source from
where data needs to be extracted
into the pipeline for enriching data.
It can be a string value in ["BigQuery",
"BigTable", "FeastFeatureStore",
"VertexAIFeatureStore"].
handler_config: Specifies the parameters for
the respective enrichment_handler in a dictionary format.
BigQuery = (
"BigQuery: "
"project, table_name, row_restriction_template, "
"fields, column_names, "condition_value_fn, "
"query_fn, min_batch_size, max_batch_size"
)

BigTable = (
"BigTable: "
"project_id, instance_id, table_id, "
"row_key, row_filter, app_profile_id, "
"encoding, ow_key_fn, exception_level, include_timestamp"
)

FeastFeatureStore = (
"FeastFeatureStore: "
"feature_store_yaml_path, feature_names, "
"feature_service_name, full_feature_names, "
"entity_row_fn, exception_level"
)

VertexAIFeatureStore = (
"VertexAIFeatureStore: "
"project, location, api_endpoint, feature_store_name, "
"feature_view_name, row_key, exception_level"
)

Example Usage:

- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30

"""
options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment')

if not Enrichment:
raise ValueError(
f"gcp dependencies not installed. Cannot use {enrichment_handler} "
f"handler. Please install using 'pip install apache-beam[gcp]'.")

if (enrichment_handler == 'FeastFeatureStore' and
not FeastFeatureStoreEnrichmentHandler):
raise ValueError(
"FeastFeatureStore handler requires 'feast' package to be installed. " +
"Please install using 'pip install feast[gcp]' and try again.")

handler_map = {
reeba212 marked this conversation as resolved.
Show resolved Hide resolved
'BigQuery': BigQueryEnrichmentHandler,
'BigTable': BigTableEnrichmentHandler,
'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler,
'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler
}

if enrichment_handler not in handler_map:
raise ValueError(f"Unknown enrichment source: {enrichment_handler}")

handler = handler_map[enrichment_handler](**handler_config)
return pcoll | Enrichment(source_handler=handler, timeout=timeout)
75 changes: 75 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_enrichment_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import unittest

import mock

import apache_beam as beam
from apache_beam import Row
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml.yaml_transform import YamlTransform


class FakeEnrichmentTransform:
def __init__(self, enrichment_handler, handler_config, timeout=30):
self._enrichment_handler = enrichment_handler
self._handler_config = handler_config
self._timeout = timeout

def __call__(self, enrichment_handler, *, handler_config, timeout=30):
assert enrichment_handler == self._enrichment_handler
assert handler_config == self._handler_config
assert timeout == self._timeout
return beam.Map(lambda x: beam.Row(**x._asdict()))


class EnrichmentTransformTest(unittest.TestCase):
def test_enrichment_with_bigquery(self):
input_data = [
Row(label="item1", rank=0),
Row(label="item2", rank=1),
]

handler = 'BigQuery'
config = {
"project": "apache-beam-testing",
"table_name": "project.database.table",
"row_restriction_template": "label='item1' or label='item2'",
"fields": ["label"]
}

with beam.Pipeline() as p:
with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform',
FakeEnrichmentTransform(enrichment_handler=handler,
handler_config=config)):
input_pcoll = p | 'CreateInput' >> beam.Create(input_data)
result = input_pcoll | YamlTransform(
f'''
type: Enrichment
config:
enrichment_handler: {handler}
handler_config: {config}
''')
assert_that(result, equal_to(input_data))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading