Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add on_bad_lines for pyarrow #54643

Merged
merged 19 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions doc/source/whatsnew/v2.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,35 @@ including other versions of pandas.
Enhancements
~~~~~~~~~~~~

.. _whatsnew_220.enhancements.enhancement1:
.. _whatsnew_220.enhancements.pyarrow_on_bad_lines:
amithkk marked this conversation as resolved.
Show resolved Hide resolved

PyArrow engine support for handling malformed lines in CSV files
amithkk marked this conversation as resolved.
Show resolved Hide resolved
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Prior to this release, the ability to handle malformed lines in CSV files was
limited to the ``python`` engine with the use of the ``on_bad_lines`` parameter.
This release brings that capability to the `PyArrow <https://arrow.apache
.org/docs/python/index.html>`_ engine as well.

The implementation supports both specifying ``skip``, ``error`` and ``warn``
values as well as passing down a callable as defined in the `PyArrow
documentation <https://arrow.apache.org/docs/python/generated/pyarrow.csv.
ParseOptions.html#pyarrow.csv.ParseOptions.invalid_row_handler>`_

*Example Usage*

.. ipython:: python

from io import StringIO

bad_csv = """a,b,c
acol1,bcol1,ccol1
acol2,ccol2
"""
df_arrow = pd.read_csv(StringIO(bad_csv), engine='pyarrow',
dtype_backend='pyarrow', on_bad_lines='skip')
df_arrow


enhancement1
^^^^^^^^^^^^

.. _whatsnew_220.enhancements.enhancement2:

Expand Down
27 changes: 27 additions & 0 deletions pandas/io/parsers/arrow_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

from typing import TYPE_CHECKING
import warnings

from pandas._config import using_pyarrow_string_dtype

from pandas._libs import lib
from pandas.compat._optional import import_optional_dependency
from pandas.errors import ParserWarning
from pandas.util._exceptions import find_stack_level

from pandas.core.dtypes.inference import is_integer

Expand Down Expand Up @@ -85,6 +88,30 @@ def _get_pyarrow_options(self) -> None:
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}

if "on_bad_lines" in self.kwds:
amithkk marked this conversation as resolved.
Show resolved Hide resolved
on_bad_lines = self.kwds["on_bad_lines"]
amithkk marked this conversation as resolved.
Show resolved Hide resolved
if callable(on_bad_lines):
self.parse_options["invalid_row_handler"] = on_bad_lines
elif on_bad_lines == ParserBase.BadLineHandleMethod.ERROR:
self.parse_options[
"invalid_row_handler"
] = None # PyArrow raises an exception by default
elif on_bad_lines == ParserBase.BadLineHandleMethod.WARN:

def handle_warning(invalid_row):
warnings.warn(
f"Expected {invalid_row.expected_columns} columns, but found "
f"{invalid_row.actual_columns}: {invalid_row.text}",
ParserWarning,
stacklevel=find_stack_level(),
)
return "skip"

self.parse_options["invalid_row_handler"] = handle_warning
elif on_bad_lines == ParserBase.BadLineHandleMethod.SKIP:
self.parse_options["invalid_row_handler"] = lambda _: "skip"

self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
Expand Down
13 changes: 10 additions & 3 deletions pandas/io/parsers/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@
expected, a ``ParserWarning`` will be emitted while dropping extra elements.
Only supported when ``engine='python'``

.. versionchanged:: 2.2.0

- Callable, function with signature
as described in `pyarrow documentation
<https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html
#pyarrow.csv.ParseOptions.invalid_row_handler>_` when ``engine='pyarrow'``

delim_whitespace : bool, default False
Specifies whether or not whitespace (e.g. ``' '`` or ``'\\t'``) will be
used as the ``sep`` delimiter. Equivalent to setting ``sep='\\s+'``. If this option
Expand Down Expand Up @@ -484,7 +491,6 @@ class _Fwf_Defaults(TypedDict):
"thousands",
"memory_map",
"dialect",
"on_bad_lines",
"delim_whitespace",
"quoting",
"lineterminator",
Expand Down Expand Up @@ -2053,9 +2059,10 @@ def _refine_defaults_read(
elif on_bad_lines == "skip":
kwds["on_bad_lines"] = ParserBase.BadLineHandleMethod.SKIP
elif callable(on_bad_lines):
if engine != "python":
if engine not in ["python", "pyarrow"]:
raise ValueError(
"on_bad_line can only be a callable function if engine='python'"
"on_bad_line can only be a callable function "
"if engine='python' or 'pyarrow'"
)
kwds["on_bad_lines"] = on_bad_lines
else:
Expand Down
10 changes: 7 additions & 3 deletions pandas/tests/io/parser/test_unsupported.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,17 @@ def test_pyarrow_engine(self):
with pytest.raises(ValueError, match=msg):
read_csv(StringIO(data), engine="pyarrow", **kwargs)

def test_on_bad_lines_callable_python_only(self, all_parsers):
def test_on_bad_lines_callable_python_or_pyarrow(self, all_parsers):
# GH 5686
# GH 54643
sio = StringIO("a,b\n1,2")
bad_lines_func = lambda x: x
parser = all_parsers
if all_parsers.engine != "python":
msg = "on_bad_line can only be a callable function if engine='python'"
if all_parsers.engine not in ["python", "pyarrow"]:
msg = (
"on_bad_line can only be a callable "
"function if engine='python' or 'pyarrow'"
)
with pytest.raises(ValueError, match=msg):
parser.read_csv(sio, on_bad_lines=bad_lines_func)
else:
Expand Down