Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first stab: microbatch #10506

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class CompiledResource(ParsedResource):
extra_ctes: List[InjectedCTE] = field(default_factory=list)
_pre_injected_sql: Optional[str] = None
contract: Contract = field(default_factory=Contract)
event_time: Optional[str] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ class SourceDefinition(ParsedSourceMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
event_time: Optional[str] = None
41 changes: 37 additions & 4 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from dbt import selected_resources
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.exceptions import MissingConfigError
from dbt.adapters.factory import (
Expand Down Expand Up @@ -230,6 +231,21 @@ def Relation(self):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

@property
def resolve_event_time_filter(self) -> Optional[EventTimeFilter]:
field_name = getattr(self.model, "event_time")
start_time = getattr(self.model, "start_time")
end_time = getattr(self.model, "end_time")

if start_time and end_time and field_name:
return EventTimeFilter(
field_name=field_name,
start_time=start_time,
end_time=end_time,
)

return None

@abc.abstractmethod
def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]:
pass
Expand Down Expand Up @@ -545,7 +561,11 @@ def resolve(
def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit)
return self.Relation.create_ephemeral_from(
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
elif (
hasattr(target_model, "defer_relation")
and target_model.defer_relation
Expand All @@ -563,10 +583,18 @@ def create_relation(self, target_model: ManifestNode) -> RelationProxy:
)
):
return self.Relation.create_from(
self.config, target_model.defer_relation, limit=self.resolve_limit
self.config,
target_model.defer_relation,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
else:
return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)

def validate(
self,
Expand Down Expand Up @@ -633,7 +661,12 @@ def resolve(self, source_name: str, table_name: str):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)


class RuntimeUnitTestSourceResolver(BaseSourceResolver):
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ class CompiledNode(CompiledResource, ParsedNode):
"""Contains attributes necessary for SQL files and nodes with refs, sources, etc,
so all ManifestNodes except SeedNode."""

# TODO: should these go here? and get set during execution?
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

@property
def empty(self):
return not self.raw_code.strip()
Expand Down
10 changes: 9 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import threading
import time
from datetime import datetime
from datetime import datetime, timedelta
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt import tracking, utils
Expand Down Expand Up @@ -214,6 +214,14 @@ def print_result_line(self, result):
)

def before_execute(self):
if self.node.config.get("microbatch"):
# TODO: actually use partition_grain
# partition_grain = self.node.config.get("partition_grain")
lookback = self.node.config.get("lookback")
self.node.end_time = datetime.now()
self.node.start_time = self.node.end_time - timedelta(days=lookback)
self.node.start_time.replace(minute=0, hour=0, second=0, microsecond=0)

self.print_start_line()

def after_execute(self, result):
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/context/test_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ def resolver(self):
mock_db_wrapper = mock.Mock()
mock_db_wrapper.Relation = BaseRelation

mock_model = mock.Mock()
mock_model.event_time = None

return RuntimeRefResolver(
db_wrapper=mock_db_wrapper,
model=mock.Mock(),
model=mock_model,
config=mock.Mock(),
manifest=mock.Mock(),
)
Expand Down Expand Up @@ -82,9 +85,12 @@ def resolver(self):
mock_db_wrapper = mock.Mock()
mock_db_wrapper.Relation = BaseRelation

mock_model = mock.Mock()
mock_model.event_time = None

return RuntimeSourceResolver(
db_wrapper=mock_db_wrapper,
model=mock.Mock(),
model=mock_model,
config=mock.Mock(),
manifest=mock.Mock(),
)
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
"constraints",
"deprecation_date",
"defer_relation",
"event_time",
"start_time",
"end_time",
}
)

Expand Down
Loading