diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 9a9fae90..722042da 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -20,10 +20,17 @@ # pylint: disable=inconsistent-return-statements def find_parent(stream): + parent_stream = stream if stream.endswith("CleanInfo"): - return stream[:stream.find("CleanInfo")] + parent_stream = stream[:stream.find("CleanInfo")] elif stream.endswith("History"): - return stream[:stream.find("History")] + parent_stream = stream[:stream.find("History")] + + # If the stripped stream ends with "__" we can assume the parent is a custom table + if parent_stream.endswith("__"): + parent_stream += 'c' + + return parent_stream class Bulk(object): @@ -97,7 +104,7 @@ def _bulk_query(self, catalog_entry, state): # Add the bulk Job ID and its batches to the state so it can be resumed if necessary tap_stream_id = catalog_entry['tap_stream_id'] state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed']) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) for completed_batch_id in batch_status['completed']: for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 6271436b..8678b2f9 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -166,7 +166,7 @@ def sync_records(sf, catalog_entry, state, counter): state, catalog_entry['tap_stream_id'], replication_key, - singer_utils.strptime(chunked_bookmark)) + singer_utils.strftime(chunked_bookmark)) def fix_record_anytype(rec, schema): """Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.'