Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdfreader and bamboohr paycheck importer #94

Merged
merged 7 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions beancount_reds_importers/importers/bamboohr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""BambooHR paycheck importer"""

import re
from dateparser.search import search_dates
from beancount_reds_importers.libreader import pdfreader
from beancount_reds_importers.libtransactionbuilder import paycheck

# BambooHR exports paycheck stubs to pdf, with multiple tables across multiple pages.
# Call this importer with a config that looks like:
#
# bamboohr.Importer({"desc":"Paycheck (My Company)",
# "main_account":"Income:Employment",
# "paycheck_template": {}, # See beancount_reds_importers/libtransactionbuilder/paycheck.py for sample template
# "currency": "PENNIES",
# }),
#

class Importer(paycheck.Importer, pdfreader.Importer):
IMPORTER_NAME = 'BambooHR Paycheck'

def custom_init(self):
self.max_rounding_error = 0.04
self.filename_pattern_def = 'PayStub.*\.pdf'
self.pdf_table_extraction_settings = {"join_tolerance":4, "snap_tolerance": 4}
self.pdf_table_extraction_crop = (0, 40, 0, 0)
self.debug = False

self.funds_db_txt = 'funds_by_ticker'
self.header_map = {
'Deduction Type': 'description',
'Pay Type': 'description',
'Paycheck Total': 'amount',
'Tax Type': 'description'
}

self.currency_fields = ['ytd_total', 'amount']

def paycheck_date(self, input_file):
if not self.file_read_done:
self.read_file(input_file)
dates = [date for _, date in search_dates(self.meta_text)]
return dates[2].date()

def prepare_tables(self):
def valid_header(label):
if label in self.header_map:
return self.header_map[header]

label = label.lower().replace(' ', '_')
return re.sub(r'20\d{2}', 'ytd', label)

for section, table in self.alltables.items():
# rename columns
for header in table.header():
table = table.rename(header,valid_header(header))
# convert columns
table = self.convert_columns(table)

self.alltables[section] = table

def build_metadata(self, file, metatype=None, data={}):
return {'filing_account': self.config['main_account']}
185 changes: 185 additions & 0 deletions beancount_reds_importers/libreader/pdfreader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@

from pprint import pformat
import pdfplumber
import pandas as pd
import petl as etl
from beancount_reds_importers.libreader import csvreader

LEFT = 0
TOP = 1
RIGHT = 2
BOTTOM = 3

BLACK = (0, 0, 0)
RED = (255, 0, 0)
PURPLE = (135,0,255)
TRANSPARENT = (0, 0, 0, 0)

class Importer(csvreader.Importer):
"""
A reader that converts a pdf with tables into a multi-petl-table format understood by transaction builders.


### Attributes customized in `custom_init`
self.pdf_table_extraction_settings: `{}`
a dictionary containing settings used to extract tables, see [pdfplumber documentation](https://github.com/jsvine/pdfplumber?tab=readme-ov-file#table-extraction-settings) for what settings are available

self.pdf_table_extraction_crop: `(int,int,int,int)`
a tuple with 4 values representing distance from left, top, right, bottom of the page respectively,
this will crop the input (each page) before searching for tables

self.pdf_table_title_height: `int`
an integer representing how far up from the top of the table should we look for a table title.
Set to 0 to not extract table titles, in which case sections will be labelled as `table_#` in the order
they were encountered

self.pdf_page_break_top: `int`
an integer representing the threshold where a table can be considered page-broken. If the top of a table is
lower than the provided value, it will be in consideration for amending to the previous page's last table.
Set to 0 to never consider page-broken tables

self.debug: `boolean`
When debug is True a few images and text file are generated:
.debug-pdf-metadata-page_#.png
shows the text available in self.meta_text with table data blacked out

.debug-pdf-table-detection-page_#.png
shows the tables detected with cells outlined in red, and the background light blue. The purple box shows where we are looking for the table title.

.debug-pdf-data.txt
is a printout of the meta_text and table data found before being processed into petl tables, as well as some generated helper objects to add to new importers or import configs

### Outputs
self.meta_text: `str`
contains all text found in the document outside of tables

self.alltables: `{'table_1': <petl table of first table in document>, ...}`
contains all the tables found in the document keyed by the extracted title if available, otherwise by the 1-based index in the form of `table_#`
"""
FILE_EXTS = ['pdf']

def initialize_reader(self, file):
if getattr(self, 'file', None) != file:
self.pdf_table_extraction_settings = {}
self.pdf_table_extraction_crop = (0, 0, 0, 0)
self.pdf_table_title_height = 20
self.pdf_page_break_top = 45
self.debug = False

self.meta_text = ''
self.file = file
self.file_read_done = False
self.reader_ready = True

def file_date(self, file):
raise "Not implemented, must overwrite, check self.alltables, or self.meta_text for the data"
pass

def prepare_tables(self):
return

def read_file(self, file):
tables = []

with pdfplumber.open(file.name) as pdf:
for page_idx, page in enumerate(pdf.pages):
# all bounding boxes are (left, top, right, bottom)
adjusted_crop = (
min(0 + self.pdf_table_extraction_crop[LEFT], page.width),
min(0 + self.pdf_table_extraction_crop[TOP], page.height),
max(page.width - self.pdf_table_extraction_crop[RIGHT],0),
max(page.height - self.pdf_table_extraction_crop[BOTTOM],0)
)

# Debug image
image = page.crop(adjusted_crop).to_image()
image.debug_tablefinder(tf=self.pdf_table_extraction_settings)

table_ref = page.crop(adjusted_crop).find_tables(table_settings=self.pdf_table_extraction_settings)
page_tables = [{'table':i.extract(), 'bbox': i.bbox} for i in table_ref]

# Get Metadata (all data outside tables)
meta_page = page
meta_image = meta_page.to_image()
for table in page_tables:
meta_page = meta_page.outside_bbox(table['bbox'])
meta_image.draw_rect(table['bbox'], BLACK, RED)

meta_text = meta_page.extract_text()
self.meta_text = self.meta_text + meta_text

# Attach section headers
for table_idx, table in enumerate(page_tables):
section_title_bbox = (
table['bbox'][LEFT],
max(table['bbox'][TOP] - self.pdf_table_title_height, 0),
table['bbox'][RIGHT],
table['bbox'][TOP]
)
section_title = meta_page.crop(section_title_bbox).extract_text()
image.draw_rect(section_title_bbox, TRANSPARENT, PURPLE)
page_tables[table_idx]['section'] = section_title

tables = tables + page_tables

if self.debug:
image.save('.debug-pdf-table-detection-page_{}.png'.format(page_idx))
meta_image.save('.debug-pdf-metadata-page_{}.png'.format(page_idx))


# Find and fix page broken tables
for table_idx, table in enumerate(tables[:]):
if (
table_idx >= 1 and # if not the first table,
table['bbox'][TOP] < self.pdf_page_break_top and # and the top of the table is close to the top of the page
table['section'] == '' and # and there is no section title
tables[table_idx - 1]['table'][0] == tables[table_idx]['table'][0] # and the header rows are the same,
): #assume a page break
tables[table_idx - 1]['table'] = tables[table_idx - 1]['table'] + tables[table_idx]['table'][1:]
del tables[table_idx]
continue

# if there is no table section give it one
if table['section'] == '':
tables[table_idx]['section'] = 'table_{}'.format(table_idx + 1)

if self.debug:
# generate helpers
paycheck_template = {}
header_map = {}
for table in tables:
for header in table['table'][0]:
header_map[header]='overwrite_me'
paycheck_template[table['section']] = {}
for row_idx, row in enumerate(table['table']):
if row_idx == 0:
continue
paycheck_template[table['section']][row[0]] = 'overwrite_me'
if not hasattr(self, 'header_map'):
self.header_map = header_map
if not hasattr(self, 'paycheck_template'):
self.paycheck_template = paycheck_template
with open('.debug-pdf-data.txt', "w") as debug_file:
debug_file.write(pformat({
'_output': {
'tables':tables,
'meta_text':self.meta_text
},
'_input': {
'table_settings': self.pdf_table_extraction_settings,
'crop_settings': self.pdf_table_extraction_crop
},
'helpers': {
'header_map':self.header_map,
'paycheck_template':self.paycheck_template
}
}))



self.alltables = {}
for table in tables:
self.alltables[table['section']] = etl.fromdataframe(pd.DataFrame(table['table'][1:], columns=table['table'][0]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this a try and the table extraction worked for a PDF with 10 separate tables. If you want to avoid the dependency on pandas this could be changed to something like this:

for table in tables:                                                                                             
    t = table["table"]                                                   
    # transpose table to use fromcolumns                                 
    tbl_t = [[t[j][i] for j in range(len(t))] for i in range(len(t[0]))] 
    self.alltables[table["section"]] = etl.fromcolumns(tbl_t)                        

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can have it work without pandas that would be great. I'll implement you suggestion and try to get a test case up before the end of the week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out we can just run etl.wrap(table['table']) rather than preprocessing it.


self.prepare_tables()
self.file_read_done = True
Loading