Skip to content

Commit

Permalink
Allow overriding joinfn and custom types from EnrichmentSourceHandler.
Browse files Browse the repository at this point in the history
  • Loading branch information
claudevdm committed Dec 19, 2024
1 parent 153e48e commit 311d1c7
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/transforms/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,22 @@ class Enrichment(beam.PTransform[beam.PCollection[InputT],
def __init__(
self,
source_handler: EnrichmentSourceHandler,
join_fn: JoinFn = cross_join,
join_fn: Optional[JoinFn] = None,
timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
repeater: Repeater = ExponentialBackOffRepeater(),
throttler: PreCallThrottler = DefaultThrottler()):
throttler: PreCallThrottler = DefaultThrottler(),
use_custom_types: bool = False):
self._cache = None
self._source_handler = source_handler
self._join_fn = join_fn
self._join_fn = (
join_fn if join_fn else source_handler.join_fn if hasattr(
source_handler, 'join_fn') else cross_join)
self._timeout = timeout
self._repeater = repeater
self._throttler = throttler
self._use_custom_types = (
source_handler.use_custom_types if hasattr(
source_handler, 'use_custom_types') else use_custom_types)

def expand(self,
input_row: beam.PCollection[InputT]) -> beam.PCollection[OutputT]:
Expand All @@ -165,8 +171,9 @@ def expand(self,
# EnrichmentSourceHandler returns a tuple of (request,response).
return (
fetched_data
| "enrichment_join" >>
beam.Map(lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())))
| "enrichment_join" >> beam.Map(
lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())
if not self._use_custom_types else self._join_fn(x[0], x[1])))

def with_redis_cache(
self,
Expand Down

0 comments on commit 311d1c7

Please sign in to comment.