From 142e39250db74d3d7c1491c8683c291948a86751 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 23 Aug 2024 14:25:13 -0700 Subject: [PATCH 1/2] Preserve numeric string literals when reading from json. This avoids surprises like https://github.com/pandas-dev/pandas/issues/42471 --- CHANGES.md | 5 +++++ sdks/python/apache_beam/io/textio.py | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 60ba3ca37446..c194cf730785 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,11 @@ ## Breaking Changes +* In Python and YAML, ReadFromJson now override the dtype from None to + an explicit False. Most notably, string values like `"123"` are preserved + as strings rather than silently coerced (and possibly truncated) to numeric + values. To retain the old behavior, pass `dtype=True` (or any other value + accepted by `pandas.read_json`). * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 3de9709d7362..0d7803bcabb1 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -23,7 +23,9 @@ from functools import partial from typing import TYPE_CHECKING from typing import Any +from typing import Dict from typing import Optional +from typing import Union from apache_beam import typehints from apache_beam.coders import coders @@ -980,7 +982,12 @@ def WriteToCsv( @append_pandas_args(pandas.read_json, exclude=['path_or_buf']) def ReadFromJson( - path: str, *, orient: str = 'records', lines: bool = True, **kwargs): + path: str, + *, + orient: str = 'records', + lines: bool = True, + dtype: Union[bool, Dict[str, Any]] = False, + **kwargs): """A PTransform for reading json values from files into a PCollection. Args: @@ -992,11 +999,14 @@ def ReadFromJson( lines (bool): Whether each line should be considered a separate record, as opposed to the entire file being a valid JSON object or list. Defaults to True (unlike Pandas). + dtype (bool): If True, infer dtypes; if a dict of column to dtype, + then use those; if False, then don’t infer dtypes at all. + Defaults to False (unlike Pandas). **kwargs: Extra arguments passed to `pandas.read_json` (see below). """ from apache_beam.dataframe.io import ReadViaPandas return 'ReadFromJson' >> ReadViaPandas( - 'json', path, orient=orient, lines=lines, **kwargs) + 'json', path, orient=orient, lines=lines, dtype=dtype, **kwargs) @append_pandas_args( pandas.DataFrame.to_json, exclude=['path_or_buf', 'index']) From 679e5cc6ff8981db07e745a936da66ddb4502c0c Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 23 Aug 2024 14:36:11 -0700 Subject: [PATCH 2/2] Add a test. --- sdks/python/apache_beam/io/textio_test.py | 30 +++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 1d852d171324..b134d615e20e 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -1743,6 +1743,36 @@ def test_json_read_write(self): assert_that(pcoll, equal_to(records)) + def test_numeric_strings_preserved(self): + records = [ + beam.Row( + as_string=str(ix), + as_float_string=str(float(ix)), + as_int=ix, + as_float=float(ix)) for ix in range(3) + ] + with tempfile.TemporaryDirectory() as dest: + with TestPipeline() as p: + # pylint: disable=expression-not-assigned + p | beam.Create(records) | beam.io.WriteToJson( + os.path.join(dest, 'out')) + with TestPipeline() as p: + pcoll = ( + p + | beam.io.ReadFromJson(os.path.join(dest, 'out*')) + | beam.Map(lambda t: beam.Row(**dict(zip(type(t)._fields, t))))) + + assert_that(pcoll, equal_to(records)) + + # This test should be redundant as Python equality does not equate + # numeric values with their string representations, but this is much + # more explicit about what we're asserting here. + def check_types(element): + for a, b in zip(element, records[0]): + assert type(a) == type(b), (a, b, type(a), type(b)) + + _ = pcoll | beam.Map(check_types) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)