From 6d73fa0e5678caf4163ddb6db5b024f64e712f76 Mon Sep 17 00:00:00 2001 From: Umberto Fasci Date: Fri, 20 Dec 2024 13:42:51 -0600 Subject: [PATCH] Make test_sql.py other connectable tests parallelizable 3 --- pandas/tests/io/test_sql.py | 99 +++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 9fae43d2ad1fe..020ed80c06427 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -63,6 +63,7 @@ pytest.mark.single_cpu, ] + def table_uuid_gen(prefix: str) -> str: """Generate a unique table name with context prefix.""" return f"{prefix}_{uuid.uuid4().hex}" @@ -1370,9 +1371,7 @@ def insert_on_conflict(table, conn, keys, data_iter): conn.execute(create_sql) expected = DataFrame([[1, 2.1, "a"]], columns=list("abc")) - expected.to_sql( - name=table_uuid, con=conn, if_exists="append", index=False - ) + expected.to_sql(name=table_uuid, con=conn, if_exists="append", index=False) df_insert = DataFrame([[1, 3.2, "b"]], columns=list("abc")) inserted = df_insert.to_sql( @@ -2029,7 +2028,7 @@ def test_api_to_sql_index_label_multiindex(conn, request): # no index name, defaults to 'level_0' and 'level_1' result = sql.to_sql(temp_frame, table_uuid, conn) assert result == expected_row_count - frame = sql.read_sql_query(f"SELECT * FROM table_uuid", conn) + frame = sql.read_sql_query("SELECT * FROM table_uuid", conn) assert frame.columns[0] == "level_0" assert frame.columns[1] == "level_1" @@ -2061,7 +2060,7 @@ def test_api_to_sql_index_label_multiindex(conn, request): index_label=["C", "D"], ) assert result == expected_row_count - frame = sql.read_sql_query(f"SELECT * FROM table_uuid", conn) + frame = sql.read_sql_query("SELECT * FROM table_uuid", conn) assert frame.columns[:2].tolist() == ["C", "D"] msg = "Length of 'index_label' should match number of levels, which is 2" @@ -2562,7 +2561,9 @@ def test_database_uri_string(conn, request, test_frame1): with tm.ensure_clean() as name: db_uri = "sqlite:///" + name table_uuid = table_uuid_gen("iris") - test_frame1.to_sql(name=table_uuid, con=db_uri, if_exists="replace", index=False) + test_frame1.to_sql( + name=table_uuid, con=db_uri, if_exists="replace", index=False + ) test_frame2 = sql.read_sql(table_uuid, db_uri) test_frame3 = sql.read_sql_table(table_uuid, db_uri) query = f"SELECT * FROM {table_uuid}" @@ -3318,7 +3319,7 @@ def test_dtype(conn, request): df = DataFrame(data, columns=cols) table_uuid1 = table_uuid_gen("dtype_test") - table_uuid2 = table_uuid_gen("dtype_test2") + table_uuid2 = table_uuid_gen("dtype_test2") table_uuid3 = table_uuid_gen("dtype_test3") table_uuid_single = table_uuid_gen("single_dtype_test") error_table = table_uuid_gen("error") @@ -3470,8 +3471,7 @@ def main(connectable): test_connectable(connectable) assert ( - DataFrame({"test_foo_data": [0, 1, 2]}).to_sql(name=table_uuid, con=conn) - == 3 + DataFrame({"test_foo_data": [0, 1, 2]}).to_sql(name=table_uuid, con=conn) == 3 ) main(conn) @@ -3900,8 +3900,7 @@ class Test(BaseModel): with Session() as session: df = DataFrame({"id": [0, 1], "string_column": ["hello", "world"]}) assert ( - df.to_sql(name=table_uuid, con=conn, index=False, if_exists="replace") - == 2 + df.to_sql(name=table_uuid, con=conn, index=False, if_exists="replace") == 2 ) session.commit() test_query = session.query(Test.id, Test.string_column) @@ -3986,9 +3985,7 @@ def test_psycopg2_schema_support(postgresql_psycopg2_engine): ) == 2 ) - assert ( - df.to_sql(name=schema_other_uuid, con=conn, index=False, schema="other") == 2 - ) + assert df.to_sql(name=schema_other_uuid, con=conn, index=False, schema="other") == 2 # read dataframes back in res1 = sql.read_sql_table(schema_public_uuid, conn) @@ -4012,9 +4009,7 @@ def test_psycopg2_schema_support(postgresql_psycopg2_engine): con.exec_driver_sql("CREATE SCHEMA other;") # write dataframe with different if_exists options - assert ( - df.to_sql(name=schema_other_uuid, con=conn, schema="other", index=False) == 2 - ) + assert df.to_sql(name=schema_other_uuid, con=conn, schema="other", index=False) == 2 df.to_sql( name=schema_other_uuid, con=conn, @@ -4042,17 +4037,17 @@ def test_self_join_date_columns(postgresql_psycopg2_engine): conn = postgresql_psycopg2_engine from sqlalchemy.sql import text - table_uuid = table_uuid_gen("person") + tb = table_uuid_gen("person") create_table = text( f""" - CREATE TABLE {table_uuid} + CREATE TABLE {tb} ( - id serial constraint {table_uuid}_pkey primary key, + id serial constraint {tb}_pkey primary key, created_dt timestamp with time zone ); - INSERT INTO {table_uuid} + INSERT INTO {tb} VALUES (1, '2021-01-01T00:00:00Z'); """ ) @@ -4060,9 +4055,7 @@ def test_self_join_date_columns(postgresql_psycopg2_engine): with con.begin(): con.execute(create_table) - sql_query = ( - f'SELECT * FROM "{table_uuid}" AS p1 INNER JOIN "{table_uuid}" AS p2 ON p1.id = p2.id;' - ) + sql_query = f'SELECT * FROM "{tb}" AS p1 INNER JOIN "{tb}" AS p2 ON p1.id = p2.id;' result = pd.read_sql(sql_query, conn) expected = DataFrame( [[1, Timestamp("2021", tz="UTC")] * 2], columns=["id", "created_dt"] * 2 @@ -4072,7 +4065,7 @@ def test_self_join_date_columns(postgresql_psycopg2_engine): # Cleanup with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: - pandasSQL.drop_table(table_uuid) + pandasSQL.drop_table(tb) def test_create_and_drop_table(sqlite_engine): @@ -4258,7 +4251,9 @@ def test_xsqlite_basic(sqlite_buildin): new_idx = Index(np.arange(len(frame2)), dtype=np.int64) + 10 frame2["Idx"] = new_idx.copy() assert sql.to_sql(frame2, name=table_uuid2, con=sqlite_buildin, index=False) == 10 - result = sql.read_sql(f"select * from {table_uuid2}", sqlite_buildin, index_col="Idx") + result = sql.read_sql( + f"select * from {table_uuid2}", sqlite_buildin, index_col="Idx" + ) expected = frame.copy() expected.index = new_idx expected.index.name = "Idx" @@ -4271,19 +4266,20 @@ def test_xsqlite_write_row_by_row(sqlite_buildin): columns=Index(list("ABCD")), index=date_range("2000-01-01", periods=10, freq="B"), ) + table_uuid = table_uuid_gen("test") frame.iloc[0, 0] = np.nan - create_sql = sql.get_schema(frame, "test") + create_sql = sql.get_schema(frame, table_uuid) cur = sqlite_buildin.cursor() cur.execute(create_sql) - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" + ins = f"INSERT INTO {table_uuid} VALUES (%s, %s, %s, %s)" for _, row in frame.iterrows(): fmt_sql = format_query(ins, *row) tquery(fmt_sql, con=sqlite_buildin) sqlite_buildin.commit() - result = sql.read_sql("select * from test", con=sqlite_buildin) + result = sql.read_sql(f"select * from {table_uuid}", con=sqlite_buildin) result.index = frame.index tm.assert_frame_equal(result, frame, rtol=1e-3) @@ -4294,17 +4290,18 @@ def test_xsqlite_execute(sqlite_buildin): columns=Index(list("ABCD")), index=date_range("2000-01-01", periods=10, freq="B"), ) - create_sql = sql.get_schema(frame, "test") + table_uuid = table_uuid_gen("test") + create_sql = sql.get_schema(frame, table_uuid) cur = sqlite_buildin.cursor() cur.execute(create_sql) - ins = "INSERT INTO test VALUES (?, ?, ?, ?)" + ins = f"INSERT INTO {table_uuid} VALUES (?, ?, ?, ?)" row = frame.iloc[0] with sql.pandasSQL_builder(sqlite_buildin) as pandas_sql: pandas_sql.execute(ins, tuple(row)) sqlite_buildin.commit() - result = sql.read_sql("select * from test", sqlite_buildin) + result = sql.read_sql(f"select * from {table_uuid}", sqlite_buildin) result.index = frame.index[:1] tm.assert_frame_equal(result, frame[:1]) @@ -4315,14 +4312,15 @@ def test_xsqlite_schema(sqlite_buildin): columns=Index(list("ABCD")), index=date_range("2000-01-01", periods=10, freq="B"), ) - create_sql = sql.get_schema(frame, "test") + table_uuid = table_uuid_gen("test") + create_sql = sql.get_schema(frame, table_uuid) lines = create_sql.splitlines() for line in lines: tokens = line.split(" ") if len(tokens) == 2 and tokens[0] == "A": assert tokens[1] == "DATETIME" - create_sql = sql.get_schema(frame, "test", keys=["A", "B"]) + create_sql = sql.get_schema(frame, table_uuid, keys=["A", "B"]) lines = create_sql.splitlines() assert 'PRIMARY KEY ("A", "B")' in create_sql cur = sqlite_buildin.cursor() @@ -4330,8 +4328,9 @@ def test_xsqlite_schema(sqlite_buildin): def test_xsqlite_execute_fail(sqlite_buildin): - create_sql = """ - CREATE TABLE test + table_uuid = table_uuid_gen("test") + create_sql = f""" + CREATE TABLE {table_uuid} ( a TEXT, b TEXT, @@ -4343,16 +4342,17 @@ def test_xsqlite_execute_fail(sqlite_buildin): cur.execute(create_sql) with sql.pandasSQL_builder(sqlite_buildin) as pandas_sql: - pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)') - pandas_sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)') + pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 1.234)') + pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "baz", 2.567)') with pytest.raises(sql.DatabaseError, match="Execution failed on sql"): - pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 7)') + pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 7)') def test_xsqlite_execute_closed_connection(): - create_sql = """ - CREATE TABLE test + table_uuid = table_uuid_gen("test") + create_sql = f""" + CREATE TABLE {table_uuid} ( a TEXT, b TEXT, @@ -4365,38 +4365,39 @@ def test_xsqlite_execute_closed_connection(): cur.execute(create_sql) with sql.pandasSQL_builder(conn) as pandas_sql: - pandas_sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)') + pandas_sql.execute(f'INSERT INTO {table_uuid} VALUES("foo", "bar", 1.234)') msg = "Cannot operate on a closed database." with pytest.raises(sqlite3.ProgrammingError, match=msg): - tquery("select * from test", con=conn) + tquery(f"select * from {table_uuid}", con=conn) def test_xsqlite_keyword_as_column_names(sqlite_buildin): + table_uuid = table_uuid_gen("testkeywords") df = DataFrame({"From": np.ones(5)}) - assert sql.to_sql(df, con=sqlite_buildin, name="testkeywords", index=False) == 5 + assert sql.to_sql(df, con=sqlite_buildin, name=table_uuid, index=False) == 5 def test_xsqlite_onecolumn_of_integer(sqlite_buildin): # GH 3628 # a column_of_integers dataframe should transfer well to sql - + table_uuid = table_uuid_gen("mono_df") mono_df = DataFrame([1, 2], columns=["c0"]) - assert sql.to_sql(mono_df, con=sqlite_buildin, name="mono_df", index=False) == 2 + assert sql.to_sql(mono_df, con=sqlite_buildin, name=table_uuid, index=False) == 2 # computing the sum via sql con_x = sqlite_buildin - the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df")) + the_sum = sum(my_c0[0] for my_c0 in con_x.execute(f"select * from {table_uuid}")) # it should not fail, and gives 3 ( Issue #3628 ) assert the_sum == 3 - result = sql.read_sql("select * from mono_df", con_x) + result = sql.read_sql(f"select * from {table_uuid}", con_x) tm.assert_frame_equal(result, mono_df) def test_xsqlite_if_exists(sqlite_buildin): df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]}) df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]}) - table_name = "table_if_exists" + table_name = table_uuid_gen("table_if_exists") sql_select = f"SELECT * FROM {table_name}" msg = "'notvalidvalue' is not valid for if_exists"