From 3cc00799c76ada58275002e1a35805a51f633fcf Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Sep 2024 00:40:10 -0400 Subject: [PATCH 1/4] Fix non-utf8 encoding problem in ReadFromCsv and WritetoCsv. --- sdks/python/apache_beam/dataframe/io.py | 16 +++++++++--- sdks/python/apache_beam/io/textio_test.py | 32 +++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index b795add1b44e..b11de68d004e 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -280,7 +280,8 @@ def expand(self, root): first_path = match.metadata_list[0].path with io.filesystems.FileSystems.open(first_path) as handle: if not self.binary: - handle = TextIOWrapper(handle) + handle = TextIOWrapper(handle, + encoding=self.kwargs.get("encoding", None)) if self.incremental: with self.reader(handle, *self.args, **dict(self.kwargs, chunksize=100)) as stream: @@ -493,6 +494,10 @@ def __init__(self, underlying, tracker, splitter): self._buffer, self._underlying) self._buffer_start_pos += len(skip) + @property + def mode(self): + return getattr(self._underlying, "mode", None) + def readable(self): return True @@ -572,6 +577,9 @@ def _read(self, size=-1): self._done = True return res + def flush(self): + return + class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider): def __init__(self, reader, args, kwargs, binary, incremental, splitter): @@ -627,7 +635,8 @@ def process( splitter=self.splitter or _DelimSplitter(b'\n', _DEFAULT_BYTES_CHUNKSIZE)) if not self.binary: - handle = TextIOWrapper(handle) + handle = TextIOWrapper(handle, + encoding=self.kwargs.get("encoding", None)) if self.incremental: if 'chunksize' not in self.kwargs: self.kwargs['chunksize'] = _DEFAULT_LINES_CHUNKSIZE @@ -688,7 +697,8 @@ def open(self, file_handle): self.buffer = [] self.empty = self.header = self.footer = None if not self.binary: - file_handle = TextIOWrapper(file_handle) + file_handle = TextIOWrapper(file_handle, + encoding=self.kwargs.get("encoding", None)) self.file_handle = file_handle def write_to(self, df, file_handle=None): diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index b134d615e20e..83b24f3925e2 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -1726,6 +1726,38 @@ def test_csv_read_write(self): assert_that(pcoll, equal_to(records)) + def test_non_utf8_csv_read_write(self): + content = b"\xe0,\xe1,\xe2\n0,1,2\n1,2,3\n" + + with tempfile.TemporaryDirectory() as dest: + input_fn = os.path.join(dest, 'input.csv') + with open(input_fn, 'wb') as f: + f.write(content) + + with TestPipeline() as p: + r1 = ( + p + | beam.io.ReadFromCsv(input_fn, encoding="latin1") + | beam.Map(lambda x: x._asdict())) + assert_that(r1, equal_to([{"\u00e0": 0, "\u00e1": 1, "\u00e2": 2}, + {"\u00e0": 1, "\u00e1": 2, "\u00e2": 3}])) + + with TestPipeline() as p: + _ = ( + p + | beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'Write' >> beam.io.WriteToCsv(os.path.join(dest, 'out'), + encoding="latin1")) + + with TestPipeline() as p: + r2 = ( + p + | 'Read' >> beam.io.ReadFromCsv(os.path.join(dest, 'out*'), + encoding="latin1") + | 'ToDict' >> beam.Map(lambda x: x._asdict())) + assert_that(r2, equal_to([{"\u00e0": 0, "\u00e1": 1, "\u00e2": 2}, + {"\u00e0": 1, "\u00e1": 2, "\u00e2": 3}])) + class JsonTest(unittest.TestCase): def test_json_read_write(self): From f81c475f02a50e005cecc01f1a0c9e62c333398d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Sep 2024 00:52:22 -0400 Subject: [PATCH 2/4] Apply yapf --- sdks/python/apache_beam/dataframe/io.py | 12 +++++----- sdks/python/apache_beam/io/textio_test.py | 28 +++++++++++++++-------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index b11de68d004e..23bfc52bed03 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -280,8 +280,8 @@ def expand(self, root): first_path = match.metadata_list[0].path with io.filesystems.FileSystems.open(first_path) as handle: if not self.binary: - handle = TextIOWrapper(handle, - encoding=self.kwargs.get("encoding", None)) + handle = TextIOWrapper( + handle, encoding=self.kwargs.get("encoding", None)) if self.incremental: with self.reader(handle, *self.args, **dict(self.kwargs, chunksize=100)) as stream: @@ -635,8 +635,8 @@ def process( splitter=self.splitter or _DelimSplitter(b'\n', _DEFAULT_BYTES_CHUNKSIZE)) if not self.binary: - handle = TextIOWrapper(handle, - encoding=self.kwargs.get("encoding", None)) + handle = TextIOWrapper( + handle, encoding=self.kwargs.get("encoding", None)) if self.incremental: if 'chunksize' not in self.kwargs: self.kwargs['chunksize'] = _DEFAULT_LINES_CHUNKSIZE @@ -697,8 +697,8 @@ def open(self, file_handle): self.buffer = [] self.empty = self.header = self.footer = None if not self.binary: - file_handle = TextIOWrapper(file_handle, - encoding=self.kwargs.get("encoding", None)) + file_handle = TextIOWrapper( + file_handle, encoding=self.kwargs.get("encoding", None)) self.file_handle = file_handle def write_to(self, df, file_handle=None): diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 83b24f3925e2..78368471ee14 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -1739,24 +1739,34 @@ def test_non_utf8_csv_read_write(self): p | beam.io.ReadFromCsv(input_fn, encoding="latin1") | beam.Map(lambda x: x._asdict())) - assert_that(r1, equal_to([{"\u00e0": 0, "\u00e1": 1, "\u00e2": 2}, - {"\u00e0": 1, "\u00e1": 2, "\u00e2": 3}])) + assert_that( + r1, + equal_to([{ + "\u00e0": 0, "\u00e1": 1, "\u00e2": 2 + }, { + "\u00e0": 1, "\u00e1": 2, "\u00e2": 3 + }])) with TestPipeline() as p: _ = ( p - | beam.io.ReadFromCsv(input_fn, encoding="latin1") - | 'Write' >> beam.io.WriteToCsv(os.path.join(dest, 'out'), - encoding="latin1")) + | beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'Write' >> beam.io.WriteToCsv( + os.path.join(dest, 'out'), encoding="latin1")) with TestPipeline() as p: r2 = ( p - | 'Read' >> beam.io.ReadFromCsv(os.path.join(dest, 'out*'), - encoding="latin1") + | 'Read' >> beam.io.ReadFromCsv( + os.path.join(dest, 'out*'), encoding="latin1") | 'ToDict' >> beam.Map(lambda x: x._asdict())) - assert_that(r2, equal_to([{"\u00e0": 0, "\u00e1": 1, "\u00e2": 2}, - {"\u00e0": 1, "\u00e1": 2, "\u00e2": 3}])) + assert_that( + r2, + equal_to([{ + "\u00e0": 0, "\u00e1": 1, "\u00e2": 2 + }, { + "\u00e0": 1, "\u00e1": 2, "\u00e2": 3 + }])) class JsonTest(unittest.TestCase): From 8088ecfffb3567e9cb4734ac5b899a0529ff9151 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Sep 2024 16:42:58 -0400 Subject: [PATCH 3/4] Set default file mode to read for _TruncatingFileHandle. --- sdks/python/apache_beam/dataframe/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 23bfc52bed03..364e06acceb0 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -496,7 +496,7 @@ def __init__(self, underlying, tracker, splitter): @property def mode(self): - return getattr(self._underlying, "mode", None) + return getattr(self._underlying, "mode", "r") def readable(self): return True From ec37750639b923656c1c846418e6472ac6543fd2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Sep 2024 14:37:58 -0400 Subject: [PATCH 4/4] Flush the underlying file-like object. --- sdks/python/apache_beam/dataframe/io.py | 2 +- sdks/python/apache_beam/io/textio_test.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 364e06acceb0..5fcb7326a026 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -578,7 +578,7 @@ def _read(self, size=-1): return res def flush(self): - return + self._underlying.flush() class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider): diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 78368471ee14..d1bfdf6bfd35 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -1737,8 +1737,8 @@ def test_non_utf8_csv_read_write(self): with TestPipeline() as p: r1 = ( p - | beam.io.ReadFromCsv(input_fn, encoding="latin1") - | beam.Map(lambda x: x._asdict())) + | 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'ToDict' >> beam.Map(lambda x: x._asdict())) assert_that( r1, equal_to([{ @@ -1750,7 +1750,7 @@ def test_non_utf8_csv_read_write(self): with TestPipeline() as p: _ = ( p - | beam.io.ReadFromCsv(input_fn, encoding="latin1") + | 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1") | 'Write' >> beam.io.WriteToCsv( os.path.join(dest, 'out'), encoding="latin1"))