From ec37750639b923656c1c846418e6472ac6543fd2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Sep 2024 14:37:58 -0400 Subject: [PATCH] Flush the underlying file-like object. --- sdks/python/apache_beam/dataframe/io.py | 2 +- sdks/python/apache_beam/io/textio_test.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 364e06acceb0..5fcb7326a026 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -578,7 +578,7 @@ def _read(self, size=-1): return res def flush(self): - return + self._underlying.flush() class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider): diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 78368471ee14..d1bfdf6bfd35 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -1737,8 +1737,8 @@ def test_non_utf8_csv_read_write(self): with TestPipeline() as p: r1 = ( p - | beam.io.ReadFromCsv(input_fn, encoding="latin1") - | beam.Map(lambda x: x._asdict())) + | 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'ToDict' >> beam.Map(lambda x: x._asdict())) assert_that( r1, equal_to([{ @@ -1750,7 +1750,7 @@ def test_non_utf8_csv_read_write(self): with TestPipeline() as p: _ = ( p - | beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1") | 'Write' >> beam.io.WriteToCsv( os.path.join(dest, 'out'), encoding="latin1"))