Skip to content

Commit

Permalink
Set an invocated_at on the RuntimeConfig and plumb to `Microbatch…
Browse files Browse the repository at this point in the history
…Builder`
  • Loading branch information
QMalcolm committed Oct 7, 2024
1 parent fc6e59e commit 12bb2ca
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
6 changes: 5 additions & 1 deletion core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import itertools
import os
from copy import deepcopy
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Expand All @@ -15,6 +16,8 @@
Type,
)

import pytz

from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
Expand Down Expand Up @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
invocated_at: datetime = field(default_factory=lambda: datetime.now(pytz.UTC))

def __post_init__(self):
self.validate()
Expand Down
6 changes: 5 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
is_incremental: bool,
event_time_start: Optional[datetime],
event_time_end: Optional[datetime],
batch_current_time: Optional[datetime] = None,
):
if model.config.incremental_strategy != "microbatch":
raise DbtInternalError(
Expand All @@ -35,10 +36,13 @@ def __init__(
event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None
)
self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None
self.batch_current_time = (
batch_current_time if batch_current_time is not None else datetime.now(pytz.UTC)
)

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or datetime.now(tz=pytz.utc)
return self.event_time_end or self.batch_current_time

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def _execute_microbatch_materialization(
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
batch_current_time=self.config.invocated_at,
)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
Expand Down

0 comments on commit 12bb2ca

Please sign in to comment.