Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite updates #13

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ Pandas-to-postgres allows you to bulk load the contents of large dataframes into
- Removes indexing overhead by automatically detecting and dropping indexes before load, and then re-creating them afterwards
- Allows you to load multiple separate HDF tables in parallel using multiprocessing.Pool
- Works around pandas null value representation issues: float pandas columns that have an integer SQL type get converted into an object column with int values where applicable and NaN elsewhere.
- Provides hooks to modify data as it's loaded
- Provides hooks to modify data as it's loaded

Anecdotally, we use this to load approximately 640 million rows of data from a 7.1GB HDF file (zlib compressed), 75% of it spread across 3 of 23 tables, with a mean number of columns of 6. We load this into an m4.xlarge RDS instance running postgres 10.3 in 54 minutes (approximately 10-15 minutes of which is recreating indexes), using 4 threads.

# Dependencies
# Dependencies

- Python 3
- psycopg2 (for the low level COPY from stdin)
- sqlalchemy (for reflection for indexes)
- pandas
- pyarrow (for copying Parquet files)

# Usage Example

Expand All @@ -41,9 +42,14 @@ hdf_to_postgres('./data.h5', engine_args=["psycopg://..."])

# Parallel HDF from file
hdf_to_postgres('./data.h5', engine_args=["psycopg://..."], processes=4)

# Parquet file
with db.engine.connect() as c:
ParquetCopy("/path/to/file.parquet", conn=c, table_obj=table_model).copy()

```

# Other Comparisons
- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained.
- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained.
- [Postgres Binary Parser](https://github.com/spitz-dan-l/postgres-binary-parser): Uses `COPY WITH BINARY` to remove the pandas to csv bottleneck, but didn't provide as good an improvement for us.
- [pg_bulkload](https://github.com/ossc-db/pg_bulkload): The industry standard, has some overlap with us. Works extremely well if you have CSV files, but not if you have any other format (you'd have to write your own chunked read/write code and pipe it through, at which point you might as well use ours). Judging by benchmarks we're in the same ballpark. Could perhaps replace psycopg2 as our backend eventually.
1 change: 1 addition & 0 deletions pandas_to_postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .copy_df import DataFrameCopy
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
from .copy_parquet import ParquetCopy
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
from .utilities import (
hdf_metadata,
Expand Down
60 changes: 39 additions & 21 deletions pandas_to_postgres/_base_copy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .utilities import get_logger
from sqlalchemy.schema import AddConstraint, DropConstraint
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import text


class BaseCopy(object):
Expand All @@ -14,7 +15,7 @@ def __init__(
conn=None,
table_obj=None,
sql_table=None,
csv_chunksize=10 ** 6,
csv_chunksize=10**6,
):
"""
Parameters
Expand Down Expand Up @@ -55,7 +56,10 @@ def instantiate_attrs(self, conn, table_obj):
"""
self.conn = conn
self.table_obj = table_obj
self.sql_table = table_obj.name
if table_obj.schema:
self.sql_table = f"{table_obj.schema}.{table_obj.name}"
else:
self.sql_table = table_obj.name
self.logger = get_logger(self.sql_table)
self.primary_key = table_obj.primary_key
self.foreign_keys = table_obj.foreign_key_constraints
Expand All @@ -65,48 +69,63 @@ def drop_pk(self):
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
constraints that may rely on the PK
"""
self.logger.info("Dropping {} primary key".format(self.sql_table))
self.logger.info(f"Dropping {self.sql_table} primary key")
try:
with self.conn.begin_nested():
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
self.conn.commit()
except SQLAlchemyError:
self.conn.rollback()
self.logger.info(
"{} primary key not found. Skipping".format(self.sql_table)
f"{self.sql_table} primary key not found. Skipping"
)

def create_pk(self):
"""Create primary key constraints on PostgreSQL table"""
self.logger.info("Creating {} primary key".format(self.sql_table))
self.conn.execute(AddConstraint(self.primary_key))
self.logger.info(f"Creating {self.sql_table} primary key")
try:
self.conn.execute(AddConstraint(self.primary_key))
self.conn.commit()
except SQLAlchemyError:
self.conn.rollback()
self.logger.warn(
f"Error creating foreign key {self.primary_key.name}"
)

def drop_fks(self):
"""Drop foreign key constraints on PostgreSQL table"""
for fk in self.foreign_keys:
self.logger.info("Dropping foreign key {}".format(fk.name))
self.logger.info(f"Dropping foreign key {fk.name}")
try:
with self.conn.begin_nested():
self.conn.execute(DropConstraint(fk))
self.conn.execute(DropConstraint(fk))
self.conn.commit()
except SQLAlchemyError:
self.logger.warn("Foreign key {} not found".format(fk.name))
self.conn.rollback()
self.logger.warn(f"Foreign key {fk.name} not found")

def create_fks(self):
"""Create foreign key constraints on PostgreSQL table"""
for fk in self.foreign_keys:
try:
self.logger.info("Creating foreign key {}".format(fk.name))
self.logger.info(f"Creating foreign key {fk.name}")
self.conn.execute(AddConstraint(fk))
self.conn.commit()
except SQLAlchemyError:
self.logger.warn("Error creating foreign key {}".format(fk.name))
self.conn.rollback()
self.logger.warn(f"Error creating foreign key {fk.name}")

def truncate(self):
"""TRUNCATE PostgreSQL table"""
self.logger.info("Truncating {}".format(self.sql_table))
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))
self.logger.info(f"Truncating {self.sql_table}")
self.conn.execution_options(autocommit=True).execute(
text(f"TRUNCATE TABLE {self.sql_table};")
)

def analyze(self):
"""Run ANALYZE on PostgreSQL table"""
self.logger.info("Analyzing {}".format(self.sql_table))
self.conn.execute("ANALYZE {};".format(self.sql_table))
self.logger.info(f"Analyzing {self.sql_table}")
self.conn.execution_options(autocommit=True).execute(
text(f"ANALYZE {self.sql_table};")
)

def copy_from_file(self, file_object):
"""
Expand All @@ -120,9 +139,8 @@ def copy_from_file(self, file_object):
cur = self.conn.connection.cursor()
file_object.seek(0)
columns = file_object.readline()
sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE".format(
table=self.sql_table, columns=columns
)

sql = f"COPY {self.sql_table} ({columns}) FROM STDIN WITH CSV FREEZE"
cur.copy_expert(sql=sql, file=file_object)

def data_formatting(self, df, functions=[], **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions pandas_to_postgres/copy_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class DataFrameCopy(BaseCopy):
"""

def __init__(
self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10 ** 6
self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10**6
):
"""
Parameters
Expand All @@ -35,20 +35,20 @@ def copy(self, functions=[cast_pandas]):
self.drop_fks()
self.drop_pk()
self.df = self.data_formatting(self.df, functions=functions)

with self.conn.begin():
self.truncate()

self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(self.df, self.csv_chunksize):

for chunk in df_generator(self.df, self.csv_chunksize, logger=self.logger):
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo

self.logger.info("All chunks copied ({} rows)".format(self.rows))
self.logger.info("All chunks copied ({} rows)".format(self.rows))

self.create_pk()
self.create_fks()
Expand Down
126 changes: 126 additions & 0 deletions pandas_to_postgres/copy_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pyarrow.parquet as pq
from .utilities import create_file_object, df_generator, cast_pandas
from ._base_copy import BaseCopy
from typing import Optional


class ParquetCopy(BaseCopy):
"""
Class for handling a standard case of reading a Parquet file into a Pandas
DataFrame, iterating over it in chunks, and COPYing to PostgreSQL via StringIO CSV
"""

def __init__(
self,
file_name: str,
defer_sql_objs: bool = False,
conn=None,
table_obj=None,
sql_table: Optional[str] = None,
schema: Optional[str] = None,
csv_chunksize=10**6,
parquet_chunksize=10**7,
):
super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize)

self.parquet_file = pq.ParquetFile(file_name)
self.parquet_chunksize = parquet_chunksize
self.total_rows = self.parquet_file.metadata.num_rows

self.logger.info("*** {} ***".format(file_name))

if self.total_rows > self.parquet_chunksize:
if self.total_rows % self.parquet_chunksize:
self.total_chunks = (self.total_rows // self.parquet_chunksize) + 1
else:
self.total_chunks = self.total_rows // self.parquet_chunksize

self.big_copy = True
else:
self.total_chunks = 1
self.big_copy = False

def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
"""
Go through sequence to COPY data to PostgreSQL table, including dropping Primary
and Foreign Keys to optimize speed, TRUNCATE table, COPY data, recreate keys,
and run ANALYZE.

Parameters
----------
data_formatters: list of functions to apply to df during sequence. Note that
each of these functions should be able to handle kwargs for one another
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
"""
self.drop_fks()
self.drop_pk()

# These need to be one transaction to use COPY FREEZE
with self.conn.begin():
self.truncate()
if self.big_copy:
self.big_parquet_to_pg(
data_formatters=data_formatters,
data_formatter_kwargs=data_formatter_kwargs,
)
else:
self.parquet_to_pg(
data_formatters=data_formatters,
data_formatter_kwargs=data_formatter_kwargs,
)

self.create_pk()
self.create_fks()
self.analyze()

def parquet_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
self.logger.info("Reading Parquet file")
df = self.parquet_file.read().to_pandas()

self.logger.info("Formatting data")
df = self.data_formatting(
df, functions=data_formatters, **data_formatter_kwargs
)

self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo
del df
self.logger.info(f"All chunks copied ({self.total_rows} rows)")

def big_parquet_to_pg(
self, data_formatters=[cast_pandas], data_formatter_kwargs={}
):
copied_rows = 0
n_chunk = 0
for batch in self.parquet_file.iter_batches(batch_size=self.parquet_chunksize):
n_chunk += 1
self.logger.info(f"*** Parquet chunk {n_chunk} of {self.total_chunks} ***")
df = batch.to_pandas()
batch_rows = df.shape[0]

self.logger.info("Formatting data")
df = self.data_formatting(
df, functions=data_formatters, **data_formatter_kwargs
)

self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo
del df

copied_rows += batch_rows

self.logger.info(f"Copied {copied_rows:,} of {self.total_rows:,} rows")

self.logger.info(f"All chunks copied ({self.total_rows:,} rows)")
9 changes: 5 additions & 4 deletions pandas_to_postgres/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def create_file_object(df):
return file_object


def df_generator(df, chunksize=10 ** 6, logger=None):
def df_generator(df, chunksize=10**6, logger=None):
"""
Create a generator to iterate over chunks of a dataframe

Expand Down Expand Up @@ -149,9 +149,10 @@ def cast_pandas(df, columns=None, copy_obj=None, logger=None, **kwargs):
for col in columns:
try:
if str(col.type) in ["INTEGER", "BIGINT"]:
df[col.name] = df[col.name].apply(
lambda x: None if isna(x) else int(x), convert_dtype=False
)
df[col.name] = df[col.name].astype("Int64")
# df[col.name] = df[col.name].apply(
# lambda x: None if isna(x) else int(x), convert_dtype=False
# )
elif str(col.type) == "BOOLEAN":
df[col.name] = df[col.name].apply(
lambda x: None if isna(x) else bool(x), convert_dtype=False
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pandas>=0.22.0
psycopg2-binary==2.7.5
SQLAlchemy==1.3.5
tables
pyarrow
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ def read(fname, lines=False):

setup(
name="pandas_to_postgres",
version="v0.0.3",
version="v0.0.4",
author="Brendan Leonard <Harvard CID>",
description=(
"Utility to copy Pandas DataFrames and DataFrames stored in HDF5 files "
"to PostgreSQL "
),
url="http://github.com/cid-harvard/pandas-to-postgres",
packages=find_packages(),
install_requires=["SQLAlchemy", "pandas", "psycopg2", "tables"],
install_requires=["SQLAlchemy", "pandas", "psycopg2-binary", "tables", "pyarrow"],
long_description=read("README.md"),
classifiers=[
"Topic :: Database",
Expand Down