Skip to content

Commit

Permalink
first pass: split_suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Sep 19, 2024
1 parent 3308a43 commit 199ebf4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
11 changes: 8 additions & 3 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ def write_graph_file(self, linker: Linker, manifest: Manifest):
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
def _write_node(
self, node: ManifestSQLNode, split_suffix: Optional[str] = None
) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
NodeType.Snapshot,
NodeType.Seed,
Expand All @@ -530,7 +532,9 @@ def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
fire_event(WritingInjectedSQLForNode(node_info=get_node_info()))

if node.compiled_code:
node.compiled_path = node.get_target_write_path(self.config.target_path, "compiled")
node.compiled_path = node.get_target_write_path(
self.config.target_path, "compiled", split_suffix
)
node.write_node(self.config.project_root, node.compiled_path, node.compiled_code)
return node

Expand All @@ -540,6 +544,7 @@ def compile_node(
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
write: bool = True,
split_suffix: Optional[str] = None,
) -> ManifestSQLNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
Expand All @@ -562,7 +567,7 @@ def compile_node(

node, _ = self._recursively_prepend_ctes(node, manifest, extra_context)
if write:
self._write_node(node)
self._write_node(node, split_suffix=split_suffix)
return node


Expand Down
14 changes: 13 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -243,14 +244,25 @@ def clear_event_status(self):

@dataclass
class ParsedNode(ParsedResource, NodeInfoMixin, ParsedNodeMandatory, SerializableType):
def get_target_write_path(self, target_path: str, subdirectory: str):
def get_target_write_path(
self, target_path: str, subdirectory: str, split_suffix: Optional[str] = None
):
# This is called for both the "compiled" subdirectory of "target" and the "run" subdirectory
if os.path.basename(self.path) == os.path.basename(self.original_file_path):
# One-to-one relationship of nodes to files.
path = self.original_file_path
else:
# Many-to-one relationship of nodes to files.
path = os.path.join(self.original_file_path, self.path)

if split_suffix:
pathlib_path = Path(path)
path = str(

Check warning on line 260 in core/dbt/contracts/graph/nodes.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/contracts/graph/nodes.py#L259-L260

Added lines #L259 - L260 were not covered by tests
pathlib_path.parent
/ pathlib_path.stem
/ (pathlib_path.stem + f"_{split_suffix}" + pathlib_path.suffix)
)

target_write_path = os.path.join(target_path, subdirectory, self.package_name, path)
return target_write_path

Expand Down
29 changes: 24 additions & 5 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,19 @@
import os
import threading
import time
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from datetime import date, datetime
from typing import (
AbstractSet,
Any,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
Type,
Union,
)

from dbt import tracking, utils
from dbt.adapters.base import BaseRelation
Expand Down Expand Up @@ -197,12 +208,18 @@ def describe_node(self) -> str:

def describe_batch(self, batch_start: Optional[datetime]) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = (
formatted_batch_start = self.format_batch_start(batch_start)

Check warning on line 211 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L211

Added line #L211 was not covered by tests

return f"batch {formatted_batch_start} of {self.get_node_representation()}"

Check warning on line 213 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L213

Added line #L213 was not covered by tests

def format_batch_start(
self, batch_start: Optional[datetime]
) -> Optional[Union[date, datetime]]:
return (

Check warning on line 218 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L218

Added line #L218 was not covered by tests
batch_start.date()
if (batch_start and self.node.config.batch_size != BatchSize.hour)
else batch_start
)
return f"batch {formatted_batch_start} of {self.get_node_representation()}"

def print_start_line(self):
fire_event(
Expand Down Expand Up @@ -463,7 +480,9 @@ def _execute_microbatch_materialization(
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(model, manifest, {})
self.compiler.compile_node(

Check warning on line 483 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L483

Added line #L483 was not covered by tests
model, manifest, {}, split_suffix=str(self.format_batch_start(batch[0]))
)
context["model"] = model
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
Expand Down

0 comments on commit 199ebf4

Please sign in to comment.