Skip to content

Commit

Permalink
Merge pull request #32463 from shunping/readfromcsv-encoding
Browse files Browse the repository at this point in the history
Fix non-utf8 encoding problem in ReadFromCsv and WriteToCsv.
  • Loading branch information
liferoad authored Sep 17, 2024
2 parents 85581a3 + 4b8cc5f commit fc4db69
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
16 changes: 13 additions & 3 deletions sdks/python/apache_beam/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ def expand(self, root):
first_path = match.metadata_list[0].path
with io.filesystems.FileSystems.open(first_path) as handle:
if not self.binary:
handle = TextIOWrapper(handle)
handle = TextIOWrapper(
handle, encoding=self.kwargs.get("encoding", None))
if self.incremental:
with self.reader(handle, *self.args, **dict(self.kwargs,
chunksize=100)) as stream:
Expand Down Expand Up @@ -493,6 +494,10 @@ def __init__(self, underlying, tracker, splitter):
self._buffer, self._underlying)
self._buffer_start_pos += len(skip)

@property
def mode(self):
return getattr(self._underlying, "mode", "r")

def readable(self):
return True

Expand Down Expand Up @@ -572,6 +577,9 @@ def _read(self, size=-1):
self._done = True
return res

def flush(self):
self._underlying.flush()


class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
def __init__(self, reader, args, kwargs, binary, incremental, splitter):
Expand Down Expand Up @@ -627,7 +635,8 @@ def process(
splitter=self.splitter or
_DelimSplitter(b'\n', _DEFAULT_BYTES_CHUNKSIZE))
if not self.binary:
handle = TextIOWrapper(handle)
handle = TextIOWrapper(
handle, encoding=self.kwargs.get("encoding", None))
if self.incremental:
if 'chunksize' not in self.kwargs:
self.kwargs['chunksize'] = _DEFAULT_LINES_CHUNKSIZE
Expand Down Expand Up @@ -688,7 +697,8 @@ def open(self, file_handle):
self.buffer = []
self.empty = self.header = self.footer = None
if not self.binary:
file_handle = TextIOWrapper(file_handle)
file_handle = TextIOWrapper(
file_handle, encoding=self.kwargs.get("encoding", None))
self.file_handle = file_handle

def write_to(self, df, file_handle=None):
Expand Down
42 changes: 42 additions & 0 deletions sdks/python/apache_beam/io/textio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,48 @@ def test_csv_read_write(self):

assert_that(pcoll, equal_to(records))

def test_non_utf8_csv_read_write(self):
content = b"\xe0,\xe1,\xe2\n0,1,2\n1,2,3\n"

with tempfile.TemporaryDirectory() as dest:
input_fn = os.path.join(dest, 'input.csv')
with open(input_fn, 'wb') as f:
f.write(content)

with TestPipeline() as p:
r1 = (
p
| 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1")
| 'ToDict' >> beam.Map(lambda x: x._asdict()))
assert_that(
r1,
equal_to([{
"\u00e0": 0, "\u00e1": 1, "\u00e2": 2
}, {
"\u00e0": 1, "\u00e1": 2, "\u00e2": 3
}]))

with TestPipeline() as p:
_ = (
p
| 'Read' >> beam.io.ReadFromCsv(input_fn, encoding="latin1")
| 'Write' >> beam.io.WriteToCsv(
os.path.join(dest, 'out'), encoding="latin1"))

with TestPipeline() as p:
r2 = (
p
| 'Read' >> beam.io.ReadFromCsv(
os.path.join(dest, 'out*'), encoding="latin1")
| 'ToDict' >> beam.Map(lambda x: x._asdict()))
assert_that(
r2,
equal_to([{
"\u00e0": 0, "\u00e1": 1, "\u00e2": 2
}, {
"\u00e0": 1, "\u00e1": 2, "\u00e2": 3
}]))


class JsonTest(unittest.TestCase):
def test_json_read_write(self):
Expand Down

0 comments on commit fc4db69

Please sign in to comment.