Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure consistent current_time across microbatch models in an invocation #10830

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241007-115853.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Ensure microbatch models use same `current_time` value
time: 2024-10-07T11:58:53.460941-05:00
custom:
Author: QMalcolm
Issue: "10819"
6 changes: 5 additions & 1 deletion core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import itertools
import os
from copy import deepcopy
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Expand All @@ -15,6 +16,8 @@
Type,
)

import pytz

from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
Expand Down Expand Up @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
invoked_at: datetime = field(default_factory=lambda: datetime.now(pytz.UTC))

def __post_init__(self):
self.validate()
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
is_incremental: bool,
event_time_start: Optional[datetime],
event_time_end: Optional[datetime],
default_end_time: Optional[datetime] = None,
):
if model.config.incremental_strategy != "microbatch":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we in the right branch for batch_current_time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand, can you elaborate on "in the right branch"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is in reference to batch vs microbatch. For better or worse, we've been kinda using them interchangeably 🙈

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cursory glance led me to believe that we have two branches – microbatch and default.
If we're not in the microbatch branch, I assume we're in the default branch.
If this is the case, does batch_current_time make sense here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR

batch_current_time (now renamed to default_end_time) is only relevant to microbatch models


This file (microbatch.py) and the class itself (MicrobatchBuilder) should only ever be invoked when running on a microbatch model. That logic for separating whether we in a "default" (a.k.a. non-microbatch) model or a microbatch model lives in run.py in the execute() method. Thus, if we're in this code (microbatch.py), we're inherently in a microbatch context / logic branch. Hope that all makes sense. I sometimes forget that the rest of the team doesn't automatically have the context of how microbatch models work 😅 Let me know if you want a walk through the microbatch code at large if that'd be useful 🙂

raise DbtInternalError(
Expand All @@ -35,10 +36,11 @@ def __init__(
event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None
)
self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None
self.default_end_time = default_end_time or datetime.now(pytz.UTC)

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or datetime.now(tz=pytz.utc)
return self.event_time_end or self.default_end_time

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def _execute_microbatch_materialization(
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
Expand Down
32 changes: 32 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
from datetime import datetime
from unittest import mock

import pytest
import pytz

from dbt.tests.util import (
get_artifact,
Expand Down Expand Up @@ -38,6 +40,11 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_downstream_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('microbatch_model') }}
"""


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
Expand Down Expand Up @@ -657,3 +664,28 @@ def test_run_with_event_time(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 3)


class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest):

@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
"second_microbatch_model.sql": microbatch_model_downstream_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_microbatch(self, project) -> None:
current_time = datetime.now(pytz.UTC)
run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")])

run_results = get_artifact(project.project_root, "target", "run_results.json")
microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1]
second_microbatch_model_last_batch = run_results["results"][2]["batch_results"][
"successful"
][-1]

# they should have the same last batch because they are using the _same_ "current_time"
assert microbatch_model_last_batch == second_microbatch_model_last_batch
Loading