From d195c3ddb6bd7d0f4ccdfcd68c45a61294684a35 Mon Sep 17 00:00:00 2001 From: Kyle Allan Date: Thu, 12 Jul 2018 13:54:57 -0400 Subject: [PATCH] Add c to parent tables ending in __ (#51) * add c to parent tables ending in __ * fix bug with bookmark at the end of chunked sync * fix bug with using a copy of completed batch ids --- tap_salesforce/salesforce/bulk.py | 13 ++++++++++--- tap_salesforce/sync.py | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 9a9fae90..722042da 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -20,10 +20,17 @@ # pylint: disable=inconsistent-return-statements def find_parent(stream): + parent_stream = stream if stream.endswith("CleanInfo"): - return stream[:stream.find("CleanInfo")] + parent_stream = stream[:stream.find("CleanInfo")] elif stream.endswith("History"): - return stream[:stream.find("History")] + parent_stream = stream[:stream.find("History")] + + # If the stripped stream ends with "__" we can assume the parent is a custom table + if parent_stream.endswith("__"): + parent_stream += 'c' + + return parent_stream class Bulk(object): @@ -97,7 +104,7 @@ def _bulk_query(self, catalog_entry, state): # Add the bulk Job ID and its batches to the state so it can be resumed if necessary tap_stream_id = catalog_entry['tap_stream_id'] state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed']) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) for completed_batch_id in batch_status['completed']: for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 6271436b..8678b2f9 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -166,7 +166,7 @@ def sync_records(sf, catalog_entry, state, counter): state, catalog_entry['tap_stream_id'], replication_key, - singer_utils.strptime(chunked_bookmark)) + singer_utils.strftime(chunked_bookmark)) def fix_record_anytype(rec, schema): """Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.'