diff --git a/.changes/unreleased/Features-20240920-172419.yaml b/.changes/unreleased/Features-20240920-172419.yaml new file mode 100644 index 00000000000..1647d48f1da --- /dev/null +++ b/.changes/unreleased/Features-20240920-172419.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Write microbatch compiled/run targets to separate files, one per batch +time: 2024-09-20T17:24:19.219556+01:00 +custom: + Author: michelleark + Issue: "10714" diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 72a5b6f016b..0ffa73df715 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -521,7 +521,9 @@ def write_graph_file(self, linker: Linker, manifest: Manifest): linker.write_graph(graph_path, manifest) # writes the "compiled_code" into the target/compiled directory - def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode: + def _write_node( + self, node: ManifestSQLNode, split_suffix: Optional[str] = None + ) -> ManifestSQLNode: if not node.extra_ctes_injected or node.resource_type in ( NodeType.Snapshot, NodeType.Seed, @@ -530,7 +532,9 @@ def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode: fire_event(WritingInjectedSQLForNode(node_info=get_node_info())) if node.compiled_code: - node.compiled_path = node.get_target_write_path(self.config.target_path, "compiled") + node.compiled_path = node.get_target_write_path( + self.config.target_path, "compiled", split_suffix + ) node.write_node(self.config.project_root, node.compiled_path, node.compiled_code) return node @@ -540,6 +544,7 @@ def compile_node( manifest: Manifest, extra_context: Optional[Dict[str, Any]] = None, write: bool = True, + split_suffix: Optional[str] = None, ) -> ManifestSQLNode: """This is the main entry point into this code. It's called by CompileRunner.compile, GenericRPCRunner.compile, and @@ -562,7 +567,7 @@ def compile_node( node, _ = self._recursively_prepend_ctes(node, manifest, extra_context) if write: - self._write_node(node) + self._write_node(node, split_suffix=split_suffix) return node diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 898437bf4da..5e2a4d14b2e 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -51,6 +51,7 @@ Exposure, Macro, ManifestNode, + ModelNode, Resource, SeedNode, SemanticModel, @@ -77,6 +78,7 @@ SecretEnvVarLocationError, TargetNotFoundError, ) +from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.node_types import ModelLanguage, NodeType from dbt.utils import MultiDict, args_to_dict from dbt_common.clients.jinja import MacroProtocol @@ -972,7 +974,20 @@ def write(self, payload: str) -> str: # macros/source defs aren't 'writeable'. if isinstance(self.model, (Macro, SourceDefinition)): raise MacrosSourcesUnWriteableError(node=self.model) - self.model.build_path = self.model.get_target_write_path(self.config.target_path, "run") + + split_suffix = None + if ( + isinstance(self.model, ModelNode) + and self.model.config.get("incremental_strategy") == "microbatch" + ): + split_suffix = MicrobatchBuilder.format_batch_start( + self.model.config.get("__dbt_internal_microbatch_event_time_start"), + self.model.config.batch_size, + ) + + self.model.build_path = self.model.get_target_write_path( + self.config.target_path, "run", split_suffix=split_suffix + ) self.model.write_node(self.config.project_root, self.model.build_path, payload) return "" diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index b28910c0de3..13c3df0e2e7 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -2,6 +2,7 @@ import os from dataclasses import dataclass, field from datetime import datetime +from pathlib import Path from typing import ( Any, Dict, @@ -243,7 +244,9 @@ def clear_event_status(self): @dataclass class ParsedNode(ParsedResource, NodeInfoMixin, ParsedNodeMandatory, SerializableType): - def get_target_write_path(self, target_path: str, subdirectory: str): + def get_target_write_path( + self, target_path: str, subdirectory: str, split_suffix: Optional[str] = None + ): # This is called for both the "compiled" subdirectory of "target" and the "run" subdirectory if os.path.basename(self.path) == os.path.basename(self.original_file_path): # One-to-one relationship of nodes to files. @@ -251,6 +254,15 @@ def get_target_write_path(self, target_path: str, subdirectory: str): else: # Many-to-one relationship of nodes to files. path = os.path.join(self.original_file_path, self.path) + + if split_suffix: + pathlib_path = Path(path) + path = str( + pathlib_path.parent + / pathlib_path.stem + / (pathlib_path.stem + f"_{split_suffix}" + pathlib_path.suffix) + ) + target_write_path = os.path.join(target_path, subdirectory, self.package_name, path) return target_write_path diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 5bd46eae5e9..4f538529d2d 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -162,3 +162,14 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize): truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc) return truncated + + @staticmethod + def format_batch_start( + batch_start: Optional[datetime], batch_size: BatchSize + ) -> Optional[str]: + if batch_start is None: + return batch_start + + return str( + batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start + ) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index e6e380b4063..70db4d52920 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -14,7 +14,6 @@ ) from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook -from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.results import ( BaseResult, NodeStatus, @@ -197,11 +196,10 @@ def describe_node(self) -> str: def describe_batch(self, batch_start: Optional[datetime]) -> str: # Only visualize date if batch_start year/month/day - formatted_batch_start = ( - batch_start.date() - if (batch_start and self.node.config.batch_size != BatchSize.hour) - else batch_start + formatted_batch_start = MicrobatchBuilder.format_batch_start( + batch_start, self.node.config.batch_size ) + return f"batch {formatted_batch_start} of {self.get_node_representation()}" def print_start_line(self): @@ -463,7 +461,14 @@ def _execute_microbatch_materialization( model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] # Recompile node to re-resolve refs with event time filters rendered, update context - self.compiler.compile_node(model, manifest, {}) + self.compiler.compile_node( + model, + manifest, + {}, + split_suffix=MicrobatchBuilder.format_batch_start( + batch[0], model.config.batch_size + ), + ) context["model"] = model context["sql"] = model.compiled_code context["compiled_code"] = model.compiled_code diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index cf8e018727f..e38c60f4247 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -5,6 +5,7 @@ from dbt.tests.util import ( patch_microbatch_end_time, + read_file, relation_from_name, run_dbt, run_dbt_and_capture, @@ -442,3 +443,78 @@ def test_run_with_event_time(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run", "--event-time-start", "2020-01-01"]) self.assert_row_count(project, "microbatch_model", 2) + + +class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # run all partitions from start - 2 expected rows in output, one failed + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run", "--event-time-start", "2020-01-01"]) + + # Compiled paths - compiled model without filter only + assert read_file( + project.project_root, + "target", + "compiled", + "test", + "models", + "microbatch_model.sql", + ) + + # Compiled paths - batch compilations + assert read_file( + project.project_root, + "target", + "compiled", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-01.sql", + ) + assert read_file( + project.project_root, + "target", + "compiled", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-02.sql", + ) + assert read_file( + project.project_root, + "target", + "compiled", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-03.sql", + ) + + assert read_file( + project.project_root, + "target", + "run", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-01.sql", + ) + assert read_file( + project.project_root, + "target", + "run", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-02.sql", + ) + assert read_file( + project.project_root, + "target", + "run", + "test", + "models", + "microbatch_model", + "microbatch_model_2020-01-03.sql", + ) diff --git a/tests/unit/graph/test_nodes.py b/tests/unit/graph/test_nodes.py index 79522d06427..a0e0a8d7e56 100644 --- a/tests/unit/graph/test_nodes.py +++ b/tests/unit/graph/test_nodes.py @@ -15,7 +15,7 @@ ) from dbt.artifacts.resources.v1.semantic_model import NodeRelation from dbt.contracts.graph.model_config import TestConfig -from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, SemanticModel +from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, ParsedNode, SemanticModel from dbt.node_types import NodeType from dbt_common.contracts.constraints import ( ColumnLevelConstraint, @@ -391,3 +391,35 @@ def test_disabled_unique_combo_multiple(): def assertSameContents(list1, list2): assert sorted(list1) == sorted(list2) + + +class TestParsedNode: + @pytest.fixture(scope="class") + def parsed_node(self) -> ParsedNode: + return ParsedNode( + resource_type=NodeType.Model, + unique_id="model.test_package.test_name", + name="test_name", + package_name="test_package", + schema="test_schema", + alias="test_alias", + fqn=["models", "test_name"], + original_file_path="test_original_file_path", + checksum=FileHash.from_contents("checksum"), + path="test_path.sql", + database=None, + ) + + def test_get_target_write_path(self, parsed_node): + write_path = parsed_node.get_target_write_path("target_path", "subdirectory") + assert ( + write_path + == "target_path/subdirectory/test_package/test_original_file_path/test_path.sql" + ) + + def test_get_target_write_path_split(self, parsed_node): + write_path = parsed_node.get_target_write_path("target_path", "subdirectory", "split") + assert ( + write_path + == "target_path/subdirectory/test_package/test_original_file_path/test_path/test_path_split.sql" + ) diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 68521a84e1e..5a5f445a104 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -444,3 +444,19 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam ) def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp): assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp + + @pytest.mark.parametrize( + "batch_size,batch_start,expected_formatted_batch_start", + [ + (None, None, None), + (BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"), + (BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"), + (BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"), + (BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"), + ], + ) + def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start): + assert ( + MicrobatchBuilder.format_batch_start(batch_start, batch_size) + == expected_formatted_batch_start + )