Skip to content

Commit

Permalink
Make test_sql.py other connectable tests parallelizable 3
Browse files Browse the repository at this point in the history
  • Loading branch information
UmbertoFasci committed Dec 20, 2024
1 parent 5250e1b commit 6d73fa0
Showing 1 changed file with 50 additions and 49 deletions.
99 changes: 50 additions & 49 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
pytest.mark.single_cpu,
]


def table_uuid_gen(prefix: str) -> str:
"""Generate a unique table name with context prefix."""
return f"{prefix}_{uuid.uuid4().hex}"
Expand Down Expand Up @@ -1370,9 +1371,7 @@ def insert_on_conflict(table, conn, keys, data_iter):
conn.execute(create_sql)

expected = DataFrame([[1, 2.1, "a"]], columns=list("abc"))
expected.to_sql(
name=table_uuid, con=conn, if_exists="append", index=False
)
expected.to_sql(name=table_uuid, con=conn, if_exists="append", index=False)

df_insert = DataFrame([[1, 3.2, "b"]], columns=list("abc"))
inserted = df_insert.to_sql(
Expand Down Expand Up @@ -2029,7 +2028,7 @@ def test_api_to_sql_index_label_multiindex(conn, request):
# no index name, defaults to 'level_0' and 'level_1'
result = sql.to_sql(temp_frame, table_uuid, conn)
assert result == expected_row_count
frame = sql.read_sql_query(f"SELECT * FROM table_uuid", conn)
frame = sql.read_sql_query("SELECT * FROM table_uuid", conn)
assert frame.columns[0] == "level_0"
assert frame.columns[1] == "level_1"

Expand Down Expand Up @@ -2061,7 +2060,7 @@ def test_api_to_sql_index_label_multiindex(conn, request):
index_label=["C", "D"],
)
assert result == expected_row_count
frame = sql.read_sql_query(f"SELECT * FROM table_uuid", conn)
frame = sql.read_sql_query("SELECT * FROM table_uuid", conn)
assert frame.columns[:2].tolist() == ["C", "D"]

msg = "Length of 'index_label' should match number of levels, which is 2"
Expand Down Expand Up @@ -2562,7 +2561,9 @@ def test_database_uri_string(conn, request, test_frame1):
with tm.ensure_clean() as name:
db_uri = "sqlite:///" + name
table_uuid = table_uuid_gen("iris")
test_frame1.to_sql(name=table_uuid, con=db_uri, if_exists="replace", index=False)
test_frame1.to_sql(
name=table_uuid, con=db_uri, if_exists="replace", index=False
)
test_frame2 = sql.read_sql(table_uuid, db_uri)
test_frame3 = sql.read_sql_table(table_uuid, db_uri)
query = f"SELECT * FROM {table_uuid}"
Expand Down Expand Up @@ -3318,7 +3319,7 @@ def test_dtype(conn, request):
df = DataFrame(data, columns=cols)

table_uuid1 = table_uuid_gen("dtype_test")
table_uuid2 = table_uuid_gen("dtype_test2")
table_uuid2 = table_uuid_gen("dtype_test2")
table_uuid3 = table_uuid_gen("dtype_test3")
table_uuid_single = table_uuid_gen("single_dtype_test")
error_table = table_uuid_gen("error")
Expand Down Expand Up @@ -3470,8 +3471,7 @@ def main(connectable):
test_connectable(connectable)

assert (
DataFrame({"test_foo_data": [0, 1, 2]}).to_sql(name=table_uuid, con=conn)
== 3
DataFrame({"test_foo_data": [0, 1, 2]}).to_sql(name=table_uuid, con=conn) == 3
)
main(conn)

Expand Down Expand Up @@ -3900,8 +3900,7 @@ class Test(BaseModel):
with Session() as session:
df = DataFrame({"id": [0, 1], "string_column": ["hello", "world"]})
assert (
df.to_sql(name=table_uuid, con=conn, index=False, if_exists="replace")
== 2
df.to_sql(name=table_uuid, con=conn, index=False, if_exists="replace") == 2
)
session.commit()
test_query = session.query(Test.id, Test.string_column)
Expand Down Expand Up @@ -3986,9 +3985,7 @@ def test_psycopg2_schema_support(postgresql_psycopg2_engine):
)
== 2
)
assert (
df.to_sql(name=schema_other_uuid, con=conn, index=False, schema="other") == 2
)
assert df.to_sql(name=schema_other_uuid, con=conn, index=False, schema="other") == 2

# read dataframes back in
res1 = sql.read_sql_table(schema_public_uuid, conn)
Expand All @@ -4012,9 +4009,7 @@ def test_psycopg2_schema_support(postgresql_psycopg2_engine):
con.exec_driver_sql("CREATE SCHEMA other;")

# write dataframe with different if_exists options
assert (
df.to_sql(name=schema_other_uuid, con=conn, schema="other", index=False) == 2
)
assert df.to_sql(name=schema_other_uuid, con=conn, schema="other", index=False) == 2
df.to_sql(
name=schema_other_uuid,
con=conn,
Expand Down Expand Up @@ -4042,27 +4037,25 @@ def test_self_join_date_columns(postgresql_psycopg2_engine):
conn = postgresql_psycopg2_engine
from sqlalchemy.sql import text

table_uuid = table_uuid_gen("person")
tb = table_uuid_gen("person")

create_table = text(
f"""
CREATE TABLE {table_uuid}
CREATE TABLE {tb}
(
id serial constraint {table_uuid}_pkey primary key,
id serial constraint {tb}_pkey primary key,
created_dt timestamp with time zone
);
INSERT INTO {table_uuid}
INSERT INTO {tb}
VALUES (1, '2021-01-01T00:00:00Z');
"""
)
with conn.connect() as con:
with con.begin():
con.execute(create_table)

sql_query = (
f'SELECT * FROM "{table_uuid}" AS p1 INNER JOIN "{table_uuid}" AS p2 ON p1.id = p2.id;'
)
sql_query = f'SELECT * FROM "{tb}" AS p1 INNER JOIN "{tb}" AS p2 ON p1.id = p2.id;'
result = pd.read_sql(sql_query, conn)
expected = DataFrame(
[[1, Timestamp("2021", tz="UTC")] * 2], columns=["id", "created_dt"] * 2
Expand All @@ -4072,7 +4065,7 @@ def test_self_join_date_columns(postgresql_psycopg2_engine):

# Cleanup
with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL:
pandasSQL.drop_table(table_uuid)
pandasSQL.drop_table(tb)


def test_create_and_drop_table(sqlite_engine):
Expand Down Expand Up @@ -4258,7 +4251,9 @@ def test_xsqlite_basic(sqlite_buildin):
new_idx = Index(np.arange(len(frame2)), dtype=np.int64) + 10
frame2["Idx"] = new_idx.copy()
assert sql.to_sql(frame2, name=table_uuid2, con=sqlite_buildin, index=False) == 10
result = sql.read_sql(f"select * from {table_uuid2}", sqlite_buildin, index_col="Idx")
result = sql.read_sql(
f"select * from {table_uuid2}", sqlite_buildin, index_col="Idx"
)
expected = frame.copy()
expected.index = new_idx
expected.index.name = "Idx"
Expand All @@ -4271,19 +4266,20 @@ def test_xsqlite_write_row_by_row(sqlite_buildin):
columns=Index(list("ABCD")),
index=date_range("2000-01-01", periods=10, freq="B"),
)
table_uuid = table_uuid_gen("test")
frame.iloc[0, 0] = np.nan
create_sql = sql.get_schema(frame, "test")
create_sql = sql.get_schema(frame, table_uuid)
cur = sqlite_buildin.cursor()
cur.execute(create_sql)

ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
ins = f"INSERT INTO {table_uuid} VALUES (%s, %s, %s, %s)"
for _, row in frame.iterrows():
fmt_sql = format_query(ins, *row)
tquery(fmt_sql, con=sqlite_buildin)

sqlite_buildin.commit()

result = sql.read_sql("select * from test", con=sqlite_buildin)
result = sql.read_sql(f"select * from {table_uuid}", con=sqlite_buildin)
result.index = frame.index
tm.assert_frame_equal(result, frame, rtol=1e-3)

Expand All @@ -4294,17 +4290,18 @@ def test_xsqlite_execute(sqlite_buildin):
columns=Index(list("ABCD")),
index=date_range("2000-01-01", periods=10, freq="B"),
)
create_sql = sql.get_schema(frame, "test")
table_uuid = table_uuid_gen("test")
create_sql = sql.get_schema(frame, table_uuid)
cur = sqlite_buildin.cursor()
cur.execute(create_sql)
ins = "INSERT INTO test VALUES (?, ?, ?, ?)"
ins = f"INSERT INTO {table_uuid} VALUES (?, ?, ?, ?)"

row = frame.iloc[0]
with sql.pandasSQL_builder(sqlite_buildin) as pandas_sql:
pandas_sql.execute(ins, tuple(row))
sqlite_buildin.commit()

result = sql.read_sql("select * from test", sqlite_buildin)
result = sql.read_sql(f"select * from {table_uuid}", sqlite_buildin)
result.index = frame.index[:1]
tm.assert_frame_equal(result, frame[:1])

Expand All @@ -4315,23 +4312,25 @@ def test_xsqlite_schema(sqlite_buildin):
columns=Index(list("ABCD")),
index=date_range("2000-01-01", periods=10, freq="B"),
)
create_sql = sql.get_schema(frame, "test")
table_uuid = table_uuid_gen("test")
create_sql = sql.get_schema(frame, table_uuid)
lines = create_sql.splitlines()
for line in lines:
tokens = line.split(" ")
if len(tokens) == 2 and tokens[0] == "A":
assert tokens[1] == "DATETIME"

create_sql = sql.get_schema(frame, "test", keys=["A", "B"])
create_sql = sql.get_schema(frame, table_uuid, keys=["A", "B"])
lines = create_sql.splitlines()
assert 'PRIMARY KEY ("A", "B")' in create_sql
cur = sqlite_buildin.cursor()
cur.execute(create_sql)


def test_xsqlite_execute_fail(sqlite_buildin):
create_sql = """
CREATE TABLE test
table_uuid = table_uuid_gen("test")
create_sql = f"""
CREATE TABLE {table_uuid}
(
a TEXT,
b TEXT,
Expand All @@ -4343,16 +4342,17 @@ def test_xsqlite_execute_fail(sqlite_buildin):
cur.execute(create_sql)

with sql.pandasSQL_builder(sqlite_buildin) as pandas_sql:
pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)')
pandas_sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)')
pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 1.234)')
pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "baz", 2.567)')

with pytest.raises(sql.DatabaseError, match="Execution failed on sql"):
pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 7)')
pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 7)')


def test_xsqlite_execute_closed_connection():
create_sql = """
CREATE TABLE test
table_uuid = table_uuid_gen("test")
create_sql = f"""
CREATE TABLE {table_uuid}
(
a TEXT,
b TEXT,
Expand All @@ -4365,38 +4365,39 @@ def test_xsqlite_execute_closed_connection():
cur.execute(create_sql)

with sql.pandasSQL_builder(conn) as pandas_sql:
pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)')
pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 1.234)')

msg = "Cannot operate on a closed database."
with pytest.raises(sqlite3.ProgrammingError, match=msg):
tquery("select * from test", con=conn)
tquery(f"select * from {table_uuid}", con=conn)


def test_xsqlite_keyword_as_column_names(sqlite_buildin):
table_uuid = table_uuid_gen("testkeywords")
df = DataFrame({"From": np.ones(5)})
assert sql.to_sql(df, con=sqlite_buildin, name="testkeywords", index=False) == 5
assert sql.to_sql(df, con=sqlite_buildin, name=table_uuid, index=False) == 5


def test_xsqlite_onecolumn_of_integer(sqlite_buildin):
# GH 3628
# a column_of_integers dataframe should transfer well to sql

table_uuid = table_uuid_gen("mono_df")
mono_df = DataFrame([1, 2], columns=["c0"])
assert sql.to_sql(mono_df, con=sqlite_buildin, name="mono_df", index=False) == 2
assert sql.to_sql(mono_df, con=sqlite_buildin, name=table_uuid, index=False) == 2
# computing the sum via sql
con_x = sqlite_buildin
the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df"))
the_sum = sum(my_c0[0] for my_c0 in con_x.execute(f"select * from {table_uuid}"))
# it should not fail, and gives 3 ( Issue #3628 )
assert the_sum == 3

result = sql.read_sql("select * from mono_df", con_x)
result = sql.read_sql(f"select * from {table_uuid}", con_x)
tm.assert_frame_equal(result, mono_df)


def test_xsqlite_if_exists(sqlite_buildin):
df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]})
table_name = "table_if_exists"
table_name = table_uuid_gen("table_if_exists")
sql_select = f"SELECT * FROM {table_name}"

msg = "'notvalidvalue' is not valid for if_exists"
Expand Down

0 comments on commit 6d73fa0

Please sign in to comment.