From a2cadeb1f6fbbd56d27aca127e9b35b0389d54b7 Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Tue, 19 May 2020 15:19:22 -0400 Subject: [PATCH 1/6] Allow Postgres schemas besides public --- pandas_to_postgres/_base_copy.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pandas_to_postgres/_base_copy.py b/pandas_to_postgres/_base_copy.py index 54307a3..29f7892 100644 --- a/pandas_to_postgres/_base_copy.py +++ b/pandas_to_postgres/_base_copy.py @@ -55,7 +55,10 @@ def instantiate_attrs(self, conn, table_obj): """ self.conn = conn self.table_obj = table_obj - self.sql_table = table_obj.name + if table_obj.schema: + self.sql_table = f"{table_obj.schema}.{table_obj.name}" + else: + self.sql_table = table_obj.name self.logger = get_logger(self.sql_table) self.primary_key = table_obj.primary_key self.foreign_keys = table_obj.foreign_key_constraints From 2ce69d5d3f7664c2bb3df57902bcfb9720b3a3a6 Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Tue, 19 May 2020 15:19:56 -0400 Subject: [PATCH 2/6] Bump version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e38c8b6..e2907be 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ def read(fname, lines=False): setup( name="pandas_to_postgres", - version="v0.0.3", + version="v0.0.4", author="Brendan Leonard ", description=( "Utility to copy Pandas DataFrames and DataFrames stored in HDF5 files " From 0bfff14d280eba55e0e1265cad7f15195878ff71 Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Fri, 8 Dec 2023 13:41:24 -0500 Subject: [PATCH 3/6] SQL functionality updates --- .gitignore | 1 + pandas_to_postgres/_base_copy.py | 28 ++++++++++++++++++++-------- pandas_to_postgres/copy_df.py | 8 ++++---- pandas_to_postgres/utilities.py | 9 +++++---- setup.py | 2 +- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 894a44c..c6fda16 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/pandas_to_postgres/_base_copy.py b/pandas_to_postgres/_base_copy.py index 29f7892..b103608 100644 --- a/pandas_to_postgres/_base_copy.py +++ b/pandas_to_postgres/_base_copy.py @@ -1,6 +1,7 @@ from .utilities import get_logger from sqlalchemy.schema import AddConstraint, DropConstraint from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.sql import text class BaseCopy(object): @@ -14,7 +15,7 @@ def __init__( conn=None, table_obj=None, sql_table=None, - csv_chunksize=10 ** 6, + csv_chunksize=10**6, ): """ Parameters @@ -70,27 +71,33 @@ def drop_pk(self): """ self.logger.info("Dropping {} primary key".format(self.sql_table)) try: - with self.conn.begin_nested(): - self.conn.execute(DropConstraint(self.primary_key, cascade=True)) + self.conn.execute(DropConstraint(self.primary_key, cascade=True)) except SQLAlchemyError: self.logger.info( "{} primary key not found. Skipping".format(self.sql_table) ) + self.conn.commit() def create_pk(self): """Create primary key constraints on PostgreSQL table""" self.logger.info("Creating {} primary key".format(self.sql_table)) - self.conn.execute(AddConstraint(self.primary_key)) + try: + self.conn.execute(AddConstraint(self.primary_key)) + except SQLAlchemyError: + self.logger.warn( + "Error creating foreign key {}".format(self.primary_key.name) + ) + self.conn.commit() def drop_fks(self): """Drop foreign key constraints on PostgreSQL table""" for fk in self.foreign_keys: self.logger.info("Dropping foreign key {}".format(fk.name)) try: - with self.conn.begin_nested(): - self.conn.execute(DropConstraint(fk)) + self.conn.execute(DropConstraint(fk)) except SQLAlchemyError: self.logger.warn("Foreign key {} not found".format(fk.name)) + self.conn.commit() def create_fks(self): """Create foreign key constraints on PostgreSQL table""" @@ -104,12 +111,16 @@ def create_fks(self): def truncate(self): """TRUNCATE PostgreSQL table""" self.logger.info("Truncating {}".format(self.sql_table)) - self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table)) + self.conn.execution_options(autocommit=True).execute( + text("TRUNCATE TABLE {};".format(self.sql_table)) + ) def analyze(self): """Run ANALYZE on PostgreSQL table""" self.logger.info("Analyzing {}".format(self.sql_table)) - self.conn.execute("ANALYZE {};".format(self.sql_table)) + self.conn.execution_options(autocommit=True).execute( + text("ANALYZE {};".format(self.sql_table)) + ) def copy_from_file(self, file_object): """ @@ -123,6 +134,7 @@ def copy_from_file(self, file_object): cur = self.conn.connection.cursor() file_object.seek(0) columns = file_object.readline() + sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE".format( table=self.sql_table, columns=columns ) diff --git a/pandas_to_postgres/copy_df.py b/pandas_to_postgres/copy_df.py index 1554c9d..798577c 100644 --- a/pandas_to_postgres/copy_df.py +++ b/pandas_to_postgres/copy_df.py @@ -9,7 +9,7 @@ class DataFrameCopy(BaseCopy): """ def __init__( - self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10 ** 6 + self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10**6 ): """ Parameters @@ -35,12 +35,12 @@ def copy(self, functions=[cast_pandas]): self.drop_fks() self.drop_pk() self.df = self.data_formatting(self.df, functions=functions) + with self.conn.begin(): self.truncate() self.logger.info("Creating generator for chunking dataframe") - for chunk in df_generator(self.df, self.csv_chunksize): - + for chunk in df_generator(self.df, self.csv_chunksize, logger=self.logger): self.logger.info("Creating CSV in memory") fo = create_file_object(chunk) @@ -48,7 +48,7 @@ def copy(self, functions=[cast_pandas]): self.copy_from_file(fo) del fo - self.logger.info("All chunks copied ({} rows)".format(self.rows)) + self.logger.info("All chunks copied ({} rows)".format(self.rows)) self.create_pk() self.create_fks() diff --git a/pandas_to_postgres/utilities.py b/pandas_to_postgres/utilities.py index 464d8c4..8f55c6e 100644 --- a/pandas_to_postgres/utilities.py +++ b/pandas_to_postgres/utilities.py @@ -92,7 +92,7 @@ def create_file_object(df): return file_object -def df_generator(df, chunksize=10 ** 6, logger=None): +def df_generator(df, chunksize=10**6, logger=None): """ Create a generator to iterate over chunks of a dataframe @@ -149,9 +149,10 @@ def cast_pandas(df, columns=None, copy_obj=None, logger=None, **kwargs): for col in columns: try: if str(col.type) in ["INTEGER", "BIGINT"]: - df[col.name] = df[col.name].apply( - lambda x: None if isna(x) else int(x), convert_dtype=False - ) + df[col.name] = df[col.name].astype("Int64") + # df[col.name] = df[col.name].apply( + # lambda x: None if isna(x) else int(x), convert_dtype=False + # ) elif str(col.type) == "BOOLEAN": df[col.name] = df[col.name].apply( lambda x: None if isna(x) else bool(x), convert_dtype=False diff --git a/setup.py b/setup.py index e2907be..31ae324 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ def read(fname, lines=False): ), url="http://github.com/cid-harvard/pandas-to-postgres", packages=find_packages(), - install_requires=["SQLAlchemy", "pandas", "psycopg2", "tables"], + install_requires=["SQLAlchemy", "pandas", "psycopg2-binary", "tables", "pyarrow"], long_description=read("README.md"), classifiers=[ "Topic :: Database", From 9c732982c3770a06ae943f3372a7d0f6cd74f6fd Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Fri, 8 Dec 2023 13:41:32 -0500 Subject: [PATCH 4/6] Parquet copying updates --- README.md | 12 ++- pandas_to_postgres/__init__.py | 1 + pandas_to_postgres/copy_parquet.py | 126 +++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 pandas_to_postgres/copy_parquet.py diff --git a/README.md b/README.md index 11d5f21..6bdd6fb 100644 --- a/README.md +++ b/README.md @@ -8,16 +8,17 @@ Pandas-to-postgres allows you to bulk load the contents of large dataframes into - Removes indexing overhead by automatically detecting and dropping indexes before load, and then re-creating them afterwards - Allows you to load multiple separate HDF tables in parallel using multiprocessing.Pool - Works around pandas null value representation issues: float pandas columns that have an integer SQL type get converted into an object column with int values where applicable and NaN elsewhere. -- Provides hooks to modify data as it's loaded +- Provides hooks to modify data as it's loaded Anecdotally, we use this to load approximately 640 million rows of data from a 7.1GB HDF file (zlib compressed), 75% of it spread across 3 of 23 tables, with a mean number of columns of 6. We load this into an m4.xlarge RDS instance running postgres 10.3 in 54 minutes (approximately 10-15 minutes of which is recreating indexes), using 4 threads. -# Dependencies +# Dependencies - Python 3 - psycopg2 (for the low level COPY from stdin) - sqlalchemy (for reflection for indexes) - pandas +- pyarrow (for copying Parquet files) # Usage Example @@ -38,9 +39,14 @@ hdf_to_postgres('./data.h5', engine_args=["psycopg://..."]) # Parallel HDF from file hdf_to_postgres('./data.h5', engine_args=["psycopg://..."], processes=4) + +# Parquet file +with db.engine.connect() as c: + ParquetCopy("/path/to/file.parquet", conn=c, table_obj=table_model).copy() + ``` # Other Comparisons -- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained. +- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained. - [Postgres Binary Parser](https://github.com/spitz-dan-l/postgres-binary-parser): Uses `COPY WITH BINARY` to remove the pandas to csv bottleneck, but didn't provide as good an improvement for us. - [pg_bulkload](https://github.com/ossc-db/pg_bulkload): The industry standard, has some overlap with us. Works extremely well if you have CSV files, but not if you have any other format (you'd have to write your own chunked read/write code and pipe it through, at which point you might as well use ours). Judging by benchmarks we're in the same ballpark. Could perhaps replace psycopg2 as our backend eventually. diff --git a/pandas_to_postgres/__init__.py b/pandas_to_postgres/__init__.py index cfc068d..1d75126 100644 --- a/pandas_to_postgres/__init__.py +++ b/pandas_to_postgres/__init__.py @@ -1,5 +1,6 @@ from .copy_df import DataFrameCopy from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy +from .copy_parquet import ParquetCopy from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker from .utilities import ( hdf_metadata, diff --git a/pandas_to_postgres/copy_parquet.py b/pandas_to_postgres/copy_parquet.py new file mode 100644 index 0000000..41722a5 --- /dev/null +++ b/pandas_to_postgres/copy_parquet.py @@ -0,0 +1,126 @@ +import pyarrow.parquet as pq +from .utilities import create_file_object, df_generator, cast_pandas +from ._base_copy import BaseCopy +from typing import Optional + + +class ParquetCopy(BaseCopy): + """ + Class for handling a standard case of reading a Parquet file into a Pandas + DataFrame, iterating over it in chunks, and COPYing to PostgreSQL via StringIO CSV + """ + + def __init__( + self, + file_name: str, + defer_sql_objs: bool = False, + conn=None, + table_obj=None, + sql_table: Optional[str] = None, + schema: Optional[str] = None, + csv_chunksize=10**6, + parquet_chunksize=10**7, + ): + super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize) + + self.parquet_file = pq.ParquetFile(file_name) + self.parquet_chunksize = parquet_chunksize + self.total_rows = self.parquet_file.metadata.num_rows + + self.logger.info("*** {} ***".format(file_name)) + + if self.total_rows > self.parquet_chunksize: + if self.total_rows % self.parquet_chunksize: + self.total_chunks = (self.total_rows // self.parquet_chunksize) + 1 + else: + self.total_chunks = self.total_rows // self.parquet_chunksize + + self.big_copy = True + else: + self.total_chunks = 1 + self.big_copy = False + + def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): + """ + Go through sequence to COPY data to PostgreSQL table, including dropping Primary + and Foreign Keys to optimize speed, TRUNCATE table, COPY data, recreate keys, + and run ANALYZE. + + Parameters + ---------- + data_formatters: list of functions to apply to df during sequence. Note that + each of these functions should be able to handle kwargs for one another + data_formatter_kwargs: list of kwargs to pass to data_formatters functions + """ + self.drop_fks() + self.drop_pk() + + # These need to be one transaction to use COPY FREEZE + with self.conn.begin(): + self.truncate() + if self.big_copy: + self.big_parquet_to_pg( + data_formatters=data_formatters, + data_formatter_kwargs=data_formatter_kwargs, + ) + else: + self.parquet_to_pg( + data_formatters=data_formatters, + data_formatter_kwargs=data_formatter_kwargs, + ) + + self.create_pk() + self.create_fks() + self.analyze() + + def parquet_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): + self.logger.info("Reading Parquet file") + df = self.parquet_file.read().to_pandas() + + self.logger.info("Formatting data") + df = self.data_formatting( + df, functions=data_formatters, **data_formatter_kwargs + ) + + self.logger.info("Creating generator for chunking dataframe") + for chunk in df_generator(df, self.csv_chunksize, logger=self.logger): + self.logger.info("Creating CSV in memory") + fo = create_file_object(chunk) + + self.logger.info("Copying chunk to database") + self.copy_from_file(fo) + del fo + del df + self.logger.info(f"All chunks copied ({self.total_rows} rows)") + + def big_parquet_to_pg( + self, data_formatters=[cast_pandas], data_formatter_kwargs={} + ): + copied_rows = 0 + n_chunk = 0 + for batch in self.parquet_file.iter_batches(batch_size=self.parquet_chunksize): + n_chunk += 1 + self.logger.info(f"*** Parquet chunk {n_chunk} of {self.total_chunks} ***") + df = batch.to_pandas() + batch_rows = df.shape[0] + + self.logger.info("Formatting data") + df = self.data_formatting( + df, functions=data_formatters, **data_formatter_kwargs + ) + + self.logger.info("Creating generator for chunking dataframe") + for chunk in df_generator(df, self.csv_chunksize, logger=self.logger): + self.logger.info("Creating CSV in memory") + fo = create_file_object(chunk) + + self.logger.info("Copying chunk to database") + self.copy_from_file(fo) + del fo + del df + + copied_rows += batch_rows + + self.logger.info(f"Copied {copied_rows:,} of {self.total_rows:,} rows") + + self.logger.info(f"All chunks copied ({self.total_rows:,} rows)") diff --git a/requirements.txt b/requirements.txt index 02c6b15..e95ae8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ pandas>=0.22.0 psycopg2-binary==2.7.5 SQLAlchemy==1.3.5 tables +pyarrow From 064ce6a0fd01bbf04342ec3cb1296165d55681a8 Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Wed, 13 Mar 2024 16:45:19 -0400 Subject: [PATCH 5/6] remove explicit commits() --- pandas_to_postgres/_base_copy.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pandas_to_postgres/_base_copy.py b/pandas_to_postgres/_base_copy.py index b103608..b476f3c 100644 --- a/pandas_to_postgres/_base_copy.py +++ b/pandas_to_postgres/_base_copy.py @@ -76,7 +76,6 @@ def drop_pk(self): self.logger.info( "{} primary key not found. Skipping".format(self.sql_table) ) - self.conn.commit() def create_pk(self): """Create primary key constraints on PostgreSQL table""" @@ -87,7 +86,6 @@ def create_pk(self): self.logger.warn( "Error creating foreign key {}".format(self.primary_key.name) ) - self.conn.commit() def drop_fks(self): """Drop foreign key constraints on PostgreSQL table""" @@ -97,7 +95,6 @@ def drop_fks(self): self.conn.execute(DropConstraint(fk)) except SQLAlchemyError: self.logger.warn("Foreign key {} not found".format(fk.name)) - self.conn.commit() def create_fks(self): """Create foreign key constraints on PostgreSQL table""" From cea9b9d6515bbc0e47d66ef3ac93d73aca7d5ee9 Mon Sep 17 00:00:00 2001 From: Brendan Leonard Date: Mon, 20 May 2024 16:57:42 -0400 Subject: [PATCH 6/6] commit execute commands --- pandas_to_postgres/_base_copy.py | 36 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/pandas_to_postgres/_base_copy.py b/pandas_to_postgres/_base_copy.py index b476f3c..a9fa8aa 100644 --- a/pandas_to_postgres/_base_copy.py +++ b/pandas_to_postgres/_base_copy.py @@ -69,54 +69,62 @@ def drop_pk(self): Drop primary key constraints on PostgreSQL table as well as CASCADE any other constraints that may rely on the PK """ - self.logger.info("Dropping {} primary key".format(self.sql_table)) + self.logger.info(f"Dropping {self.sql_table} primary key") try: self.conn.execute(DropConstraint(self.primary_key, cascade=True)) + self.conn.commit() except SQLAlchemyError: + self.conn.rollback() self.logger.info( - "{} primary key not found. Skipping".format(self.sql_table) + f"{self.sql_table} primary key not found. Skipping" ) def create_pk(self): """Create primary key constraints on PostgreSQL table""" - self.logger.info("Creating {} primary key".format(self.sql_table)) + self.logger.info(f"Creating {self.sql_table} primary key") try: self.conn.execute(AddConstraint(self.primary_key)) + self.conn.commit() except SQLAlchemyError: + self.conn.rollback() self.logger.warn( - "Error creating foreign key {}".format(self.primary_key.name) + f"Error creating foreign key {self.primary_key.name}" ) def drop_fks(self): """Drop foreign key constraints on PostgreSQL table""" for fk in self.foreign_keys: - self.logger.info("Dropping foreign key {}".format(fk.name)) + self.logger.info(f"Dropping foreign key {fk.name}") try: self.conn.execute(DropConstraint(fk)) + self.conn.commit() except SQLAlchemyError: - self.logger.warn("Foreign key {} not found".format(fk.name)) + self.conn.rollback() + self.logger.warn(f"Foreign key {fk.name} not found") def create_fks(self): """Create foreign key constraints on PostgreSQL table""" for fk in self.foreign_keys: try: - self.logger.info("Creating foreign key {}".format(fk.name)) + self.logger.info(f"Creating foreign key {fk.name}") self.conn.execute(AddConstraint(fk)) + self.conn.commit() except SQLAlchemyError: - self.logger.warn("Error creating foreign key {}".format(fk.name)) + self.conn.rollback() + self.logger.warn(f"Error creating foreign key {fk.name}") def truncate(self): """TRUNCATE PostgreSQL table""" - self.logger.info("Truncating {}".format(self.sql_table)) + self.logger.info(f"Truncating {self.sql_table}") self.conn.execution_options(autocommit=True).execute( - text("TRUNCATE TABLE {};".format(self.sql_table)) + text(f"TRUNCATE TABLE {self.sql_table};") ) def analyze(self): """Run ANALYZE on PostgreSQL table""" - self.logger.info("Analyzing {}".format(self.sql_table)) + self.logger.info(f"Analyzing {self.sql_table}") self.conn.execution_options(autocommit=True).execute( - text("ANALYZE {};".format(self.sql_table)) + text(f"ANALYZE {self.sql_table};") ) def copy_from_file(self, file_object): @@ -132,9 +140,7 @@ def copy_from_file(self, file_object): file_object.seek(0) columns = file_object.readline() - sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE".format( - table=self.sql_table, columns=columns - ) + sql = f"COPY {self.sql_table} ({columns}) FROM STDIN WITH CSV FREEZE" cur.copy_expert(sql=sql, file=file_object) def data_formatting(self, df, functions=[], **kwargs):