Skip to content

Commit

Permalink
Stop making microbatch batches with filters that will never have any …
Browse files Browse the repository at this point in the history
…rows

Our logic prevously created batches for each batch period where
batch_start <= event_end_time. This was problematic when a batch_start
equaled the event_end_time because a batch would be produced with the filter
like `WHERE event_time >= '2024-01-01 00:00:00' AND event_time < '2024-01-01 00:00:00'`.
The two statements in that filter would logicially exclude each other meaning that
0 rows would be selected _always_. Thus we've changed the batch creation logic
to be batch_start `<` event_end_time (as opposed to `<=`), which stops the
bad batch filter from being a possibility.
  • Loading branch information
QMalcolm committed Oct 4, 2024
1 parent 6b9c1da commit 30c9aea
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:
)

batches: List[BatchType] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
while curr_batch_end < end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
batches.append((curr_batch_start, curr_batch_end))
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,67 @@ def test_build_start_time(
),
],
),
# Test when event_time_end matches the truncated batch size
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
[
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
[
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
[
(
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC),
BatchSize.hour,
[
(
datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC),
),
],
),
],
)
def test_build_batches(self, microbatch_model, start, end, batch_size, expected_batches):
Expand Down

0 comments on commit 30c9aea

Please sign in to comment.