diff --git a/.changes/unreleased/Fixes-20241004-133630.yaml b/.changes/unreleased/Fixes-20241004-133630.yaml new file mode 100644 index 00000000000..7d23bda0c13 --- /dev/null +++ b/.changes/unreleased/Fixes-20241004-133630.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Handle edge cases when a specified `--event-time-end` is equivalent to the batch + size truncated batch start time +time: 2024-10-04T13:36:30.357936-05:00 +custom: + Author: QMalcolm + Issue: "10824" diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 752793e124b..268132ffa0c 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -65,9 +65,15 @@ def build_start_time(self, checkpoint: Optional[datetime]): return MicrobatchBuilder.truncate_timestamp(self.model.config.begin, batch_size) lookback = self.model.config.lookback - start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback) - return start + # If the checkpoint is equivalent to itself truncated then the checkpoint stradles + # the batch line. In this case the last batch will end with the checkpoint, but start + # should be the previous hour/day/month/year. Thus we need to increase the lookback by + # 1 to get this affect properly. + if checkpoint == MicrobatchBuilder.truncate_timestamp(checkpoint, batch_size): + lookback += 1 + + return MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback) def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: """ @@ -81,7 +87,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: ) batches: List[BatchType] = [(curr_batch_start, curr_batch_end)] - while curr_batch_end <= end: + while curr_batch_end < end: curr_batch_start = curr_batch_end curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1) batches.append((curr_batch_start, curr_batch_end)) diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index f5982033be9..5c368f535a9 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -245,6 +245,70 @@ def test_build_end_time( 1, datetime(2024, 9, 5, 7, 0, 0, 0, pytz.UTC), ), + ( + True, + None, + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + BatchSize.hour, + 0, + datetime(2024, 9, 4, 23, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + BatchSize.hour, + 1, + datetime(2024, 9, 4, 22, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + BatchSize.day, + 0, + datetime(2024, 9, 4, 0, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + BatchSize.day, + 1, + datetime(2024, 9, 3, 0, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.month, + 0, + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.month, + 1, + datetime(2024, 7, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.year, + 0, + datetime(2023, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + True, + None, + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.year, + 1, + datetime(2022, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), ], ) def test_build_start_time( @@ -351,6 +415,67 @@ def test_build_start_time( ), ], ), + # Test when event_time_end matches the truncated batch size + ( + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.year, + [ + ( + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ], + ), + ( + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.month, + [ + ( + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC), + ), + ], + ), + ( + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC), + BatchSize.day, + [ + ( + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC), + ), + ], + ), + ( + datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC), + BatchSize.hour, + [ + ( + datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC), + datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC), + ), + ], + ), ], ) def test_build_batches(self, microbatch_model, start, end, batch_size, expected_batches):