From 418a1d40fff8924de3e13dee19fefe8a5cfb50e8 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 17 Oct 2024 09:03:35 -0700 Subject: [PATCH] Use WindowedValueParam for interactive cache. --- .../apache_beam/runners/interactive/caching/reify.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/caching/reify.py b/sdks/python/apache_beam/runners/interactive/caching/reify.py index ce82785b2585..c82033dc1b9b 100644 --- a/sdks/python/apache_beam/runners/interactive/caching/reify.py +++ b/sdks/python/apache_beam/runners/interactive/caching/reify.py @@ -28,7 +28,6 @@ import apache_beam as beam from apache_beam.runners.interactive import cache_manager as cache from apache_beam.testing import test_stream -from apache_beam.transforms.window import WindowedValue READ_CACHE = 'ReadCache_' WRITE_CACHE = 'WriteCache_' @@ -40,13 +39,8 @@ class Reify(beam.DoFn): Internally used to capture window info with each element into cache for replayability. """ - def process( - self, - e, - w=beam.DoFn.WindowParam, - p=beam.DoFn.PaneInfoParam, - t=beam.DoFn.TimestampParam): - yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p)) + def process(self, e, wv=beam.DoFn.WindowedValueParam): + yield test_stream.WindowedValueHolder(wv) class Unreify(beam.DoFn):