Skip to content

Commit

Permalink
feat: implement option 'delete_rows' of argument 'if_exists' in 'Data…
Browse files Browse the repository at this point in the history
…Frame.to_sql' API.
  • Loading branch information
gmcrocetti committed Dec 27, 2024
1 parent 59b3a1a commit 2eb19e7
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 12 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Other enhancements
- :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`)
- :meth:`pandas.concat` will raise a ``ValueError`` when ``ignore_index=True`` and ``keys`` is not ``None`` (:issue:`59274`)
- :meth:`str.get_dummies` now accepts a ``dtype`` parameter to specify the dtype of the resulting DataFrame (:issue:`47872`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Multiplying two :class:`DateOffset` objects will now raise a ``TypeError`` instead of a ``RecursionError`` (:issue:`59442`)
- Restore support for reading Stata 104-format and enable reading 103-format dta files (:issue:`58554`)
- Support passing a :class:`Iterable[Hashable]` input to :meth:`DataFrame.drop_duplicates` (:issue:`59237`)
Expand Down
56 changes: 45 additions & 11 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ def to_sql(
name: str,
con,
schema: str | None = None,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label: IndexLabel | None = None,
chunksize: int | None = None,
Expand All @@ -764,10 +764,11 @@ def to_sql(
schema : str, optional
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column.
index_label : str or sequence, optional
Expand Down Expand Up @@ -818,7 +819,7 @@ def to_sql(
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
""" # noqa: E501
if if_exists not in ("fail", "replace", "append"):
if if_exists not in ("fail", "replace", "append", "delete_rows"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")

if isinstance(frame, Series):
Expand Down Expand Up @@ -926,7 +927,7 @@ def __init__(
pandas_sql_engine,
frame=None,
index: bool | str | list[str] | None = True,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
prefix: str = "pandas",
index_label=None,
schema=None,
Expand Down Expand Up @@ -974,11 +975,13 @@ def create(self) -> None:
if self.exists():
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
elif self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
elif self.if_exists == "delete_rows":
self.pd_sql.delete_rows(self.name, self.schema)
else:
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
else:
Expand Down Expand Up @@ -1480,7 +1483,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1866,7 +1869,7 @@ def prep_table(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool | str | list[str] | None = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1943,7 +1946,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -1961,10 +1964,11 @@ def to_sql(
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2061,6 +2065,18 @@ def drop_table(self, table_name: str, schema: str | None = None) -> None:
self.get_table(table_name, schema).drop(bind=self.con)
self.meta.clear()

def delete_rows(self, table_name: str, schema: str | None = None) -> None:
schema = schema or self.meta.schema
if self.has_table(table_name, schema):
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
with self.run_transaction() as con:
table = self.get_table(table_name, schema)
con.execute(table.delete())

self.meta.clear()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2296,7 +2312,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -2318,6 +2334,7 @@ def to_sql(
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand All @@ -2335,6 +2352,7 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2368,6 +2386,9 @@ def to_sql(
cur.execute(f"DROP TABLE {table_name}")
elif if_exists == "append":
mode = "append"
elif if_exists == "delete_rows":
mode = "append"
self.delete_rows(name, schema)

import pyarrow as pa

Expand Down Expand Up @@ -2402,6 +2423,12 @@ def has_table(self, name: str, schema: str | None = None) -> bool:

return False

def delete_rows(self, name: str, schema: str | None = None) -> None:
delete_sql = f"DELETE FROM {schema}.{name}" if schema else f"DELETE FROM {name}"
if self.has_table(name, schema):
with self.con.cursor() as cur:
cur.execute(delete_sql)

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2769,10 +2796,11 @@ def to_sql(
frame: DataFrame
name: string
Name of SQL table.
if_exists: {'fail', 'replace', 'append'}, default 'fail'
if_exists: {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if it does not exist.
delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Expand Down Expand Up @@ -2848,6 +2876,12 @@ def drop_table(self, name: str, schema: str | None = None) -> None:
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
self.execute(drop_sql)

def delete_rows(self, name: str, schema: str | None = None) -> None:
delete_sql = f"DELETE FROM {_get_valid_sqlite_name(name)}"
if self.has_table(name, schema):
with self.run_transaction() as cur:
cur.execute(delete_sql)

def _create_sql_schema(
self,
frame,
Expand Down
62 changes: 61 additions & 1 deletion pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,9 @@ def test_to_sql(conn, method, test_frame1, request):


@pytest.mark.parametrize("conn", all_connectable)
@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)])
@pytest.mark.parametrize(
"mode, num_row_coef", [("replace", 1), ("append", 2), ("delete_rows", 1)]
)
def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request):
conn = request.getfixturevalue(conn)
with pandasSQL_builder(conn, need_transaction=True) as pandasSQL:
Expand Down Expand Up @@ -2698,6 +2700,64 @@ def test_drop_table(conn, request):
assert not insp.has_table("temp_frame")


@pytest.mark.parametrize("conn", all_connectable)
def test_delete_rows_success(conn, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn)
pandasSQL = pandasSQL_builder(conn)

with pandasSQL.run_transaction():
assert pandasSQL.to_sql(test_frame1, table_name) == test_frame1.shape[0]

with pandasSQL.run_transaction():
assert pandasSQL.delete_rows(table_name) is None

assert count_rows(conn, table_name) == 0
assert pandasSQL.has_table("temp_frame")


@pytest.mark.parametrize("conn", all_connectable)
def test_delete_rows_is_atomic(conn, request):
adbc_driver_manager = pytest.importorskip("adbc_driver_manager")
sqlalchemy = pytest.importorskip("sqlalchemy")

if "sqlite" in conn:
reason = "This test relies on strict column types, SQLite has a dynamic one"
request.applymarker(
pytest.mark.xfail(
reason=reason,
strict=True,
)
)

table_name = "temp_frame"
original_df = DataFrame({"a": [1, 2, 3]})
replacing_df = DataFrame({"a": ["a", "b", "c", "d"]})

conn = request.getfixturevalue(conn)
pandasSQL = pandasSQL_builder(conn)

if isinstance(conn, adbc_driver_manager.dbapi.Connection):
expected_exception = adbc_driver_manager.ProgrammingError
else:
expected_exception = sqlalchemy.exc.DataError

with pandasSQL.run_transaction():
pandasSQL.to_sql(original_df, table_name, if_exists="fail", index=False)

# trying to insert strings in an integer column
with pytest.raises(expected_exception):
with pandasSQL.run_transaction():
pandasSQL.to_sql(
replacing_df, table_name, if_exists="delete_rows", index=False
)

# failed "delete_rows" is rolled back preserving original data
with pandasSQL.run_transaction():
result_df = pandasSQL.read_query(f"SELECT * FROM {table_name}")
tm.assert_frame_equal(result_df, original_df)


@pytest.mark.parametrize("conn", all_connectable)
def test_roundtrip(conn, request, test_frame1):
if conn == "sqlite_str":
Expand Down

0 comments on commit 2eb19e7

Please sign in to comment.