Skip to content

Commit

Permalink
first stab: microbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Jul 23, 2024
1 parent 79ad0a3 commit 2a146bb
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 7 deletions.
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class CompiledResource(ParsedResource):
extra_ctes: List[InjectedCTE] = field(default_factory=list)
_pre_injected_sql: Optional[str] = None
contract: Contract = field(default_factory=Contract)
event_time: Optional[str] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ class SourceDefinition(ParsedSourceMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
event_time: Optional[str] = None
41 changes: 37 additions & 4 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from dbt import selected_resources
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.exceptions import MissingConfigError
from dbt.adapters.factory import (
Expand Down Expand Up @@ -230,6 +231,21 @@ def Relation(self):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

@property
def resolve_event_time_filter(self) -> Optional[EventTimeFilter]:
field_name = getattr(self.model, "event_time")
start_time = getattr(self.model, "start_time")
end_time = getattr(self.model, "end_time")

if start_time and end_time and field_name:
return EventTimeFilter(
field_name=field_name,
start_time=start_time,
end_time=end_time,
)

return None

@abc.abstractmethod
def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]:
pass
Expand Down Expand Up @@ -545,7 +561,11 @@ def resolve(
def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit)
return self.Relation.create_ephemeral_from(
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
elif (
hasattr(target_model, "defer_relation")
and target_model.defer_relation
Expand All @@ -563,10 +583,18 @@ def create_relation(self, target_model: ManifestNode) -> RelationProxy:
)
):
return self.Relation.create_from(
self.config, target_model.defer_relation, limit=self.resolve_limit
self.config,
target_model.defer_relation,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
else:
return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)

def validate(
self,
Expand Down Expand Up @@ -633,7 +661,12 @@ def resolve(self, source_name: str, table_name: str):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)


class RuntimeUnitTestSourceResolver(BaseSourceResolver):
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ class CompiledNode(CompiledResource, ParsedNode):
"""Contains attributes necessary for SQL files and nodes with refs, sources, etc,
so all ManifestNodes except SeedNode."""

# TODO: should these go here? and get set during execution?
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

@property
def empty(self):
return not self.raw_code.strip()
Expand Down
10 changes: 9 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import threading
import time
from datetime import datetime
from datetime import datetime, timedelta
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt import tracking, utils
Expand Down Expand Up @@ -214,6 +214,14 @@ def print_result_line(self, result):
)

def before_execute(self):
if self.node.config.get("microbatch"):
# TODO: actually use partition_grain
# partition_grain = self.node.config.get("partition_grain")
lookback = self.node.config.get("lookback")
self.node.end_time = datetime.now()
self.node.start_time = self.node.end_time - timedelta(days=lookback)
self.node.start_time.replace(minute=0, hour=0, second=0, microsecond=0)

self.print_start_line()

def after_execute(self, result):
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/context/test_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ def resolver(self):
mock_db_wrapper = mock.Mock()
mock_db_wrapper.Relation = BaseRelation

mock_model = mock.Mock()
mock_model.event_time = None

return RuntimeRefResolver(
db_wrapper=mock_db_wrapper,
model=mock.Mock(),
model=mock_model,
config=mock.Mock(),
manifest=mock.Mock(),
)
Expand Down Expand Up @@ -82,9 +85,12 @@ def resolver(self):
mock_db_wrapper = mock.Mock()
mock_db_wrapper.Relation = BaseRelation

mock_model = mock.Mock()
mock_model.event_time = None

return RuntimeSourceResolver(
db_wrapper=mock_db_wrapper,
model=mock.Mock(),
model=mock_model,
config=mock.Mock(),
manifest=mock.Mock(),
)
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
"constraints",
"deprecation_date",
"defer_relation",
"event_time",
"start_time",
"end_time",
}
)

Expand Down

0 comments on commit 2a146bb

Please sign in to comment.