Skip to content

Commit

Permalink
Add c to parent tables ending in __ (singer-io#51)
Browse files Browse the repository at this point in the history
* add c to parent tables ending in __

* fix bug with bookmark at the end of chunked sync

* fix bug with using a copy of completed batch ids
  • Loading branch information
KAllan357 authored Jul 12, 2018
1 parent 721bf1f commit d195c3d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
13 changes: 10 additions & 3 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@

# pylint: disable=inconsistent-return-statements
def find_parent(stream):
parent_stream = stream
if stream.endswith("CleanInfo"):
return stream[:stream.find("CleanInfo")]
parent_stream = stream[:stream.find("CleanInfo")]
elif stream.endswith("History"):
return stream[:stream.find("History")]
parent_stream = stream[:stream.find("History")]

# If the stripped stream ends with "__" we can assume the parent is a custom table
if parent_stream.endswith("__"):
parent_stream += 'c'

return parent_stream


class Bulk(object):
Expand Down Expand Up @@ -97,7 +104,7 @@ def _bulk_query(self, catalog_entry, state):
# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'])
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

for completed_batch_id in batch_status['completed']:
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
Expand Down
2 changes: 1 addition & 1 deletion tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def sync_records(sf, catalog_entry, state, counter):
state,
catalog_entry['tap_stream_id'],
replication_key,
singer_utils.strptime(chunked_bookmark))
singer_utils.strftime(chunked_bookmark))

def fix_record_anytype(rec, schema):
"""Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.'
Expand Down

0 comments on commit d195c3d

Please sign in to comment.