Skip to content

Commit

Permalink
Update MicrobatchBuilder.build_start_time to not create start times…
Browse files Browse the repository at this point in the history
… that equal the checkpoint

Previously if the checkpoint provided to `build_start_time` was at the truncation point
for the related batch size, then the returned `start_time` would be _the same_ as the checkpoint.
For example if the checkpoint was "2024-09-05 00:00:00", and the batch size was `day`, then the
returend `start_time` would be "2024-09-05 00:00:00". This is problematic because then the would
be no batch created when running `build_batches`. Or, prior to 12bb2ca, you'd get one batch with
a filter like `event_time >= 2024-09-05 00:00:00 AND event_time < 2024-09-05 00:00:00` which is
impossible to satisfy.

The change in this PR makes it so that if the checkpoint is at the truncation point, then the start
time will be guaranteed to move back by one batch period. That is, following the same example,
"2024-09-04 00:00:00" would be returned.
  • Loading branch information
QMalcolm committed Oct 8, 2024
1 parent 11d3617 commit 4e3c4b4
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ def build_start_time(self, checkpoint: Optional[datetime]):
return MicrobatchBuilder.truncate_timestamp(self.model.config.begin, batch_size)

lookback = self.model.config.lookback
start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

return start
# If the checkpoint is equivalent to itself truncated then the checkpoint stradles
# the batch line. In this case the last batch will end with the checkpoint, but start
# should be the previous hour/day/month/year. Thus we need to increase the lookback by
# 1 to get this affect properly.
if checkpoint == MicrobatchBuilder.truncate_timestamp(checkpoint, batch_size):
lookback += 1

return MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:
"""
Expand Down

0 comments on commit 4e3c4b4

Please sign in to comment.