Skip to content

Commit

Permalink
Merge branch 'main' into state-modified-source-unrendered-database
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Sep 26, 2024
2 parents 3aede61 + b590045 commit e924bdc
Show file tree
Hide file tree
Showing 36 changed files with 616 additions and 146 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240924-152922.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added the --inline-direct parameter to 'dbt show'
time: 2024-09-24T15:29:22.874496-04:00
custom:
Author: aranke peterallenwebb
Issue: "10770"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240925-165002.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable `retry` support for microbatch models
time: 2024-09-25T16:50:02.105069-05:00
custom:
Author: QMalcolm MichelleArk
Issue: "10624"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240925-160543.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Improve performance of infer primary key
time: 2024-09-25T16:05:43.59536-04:00
custom:
Author: gshank
Issue: "10781"
21 changes: 21 additions & 0 deletions core/dbt/artifacts/schemas/batch_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple

from dbt_common.dataclass_schema import dbtClassMixin

BatchType = Tuple[datetime, datetime]


@dataclass
class BatchResults(dbtClassMixin):
successful: List[BatchType] = field(default_factory=list)
failed: List[BatchType] = field(default_factory=list)

def __add__(self, other: BatchResults) -> BatchResults:
return BatchResults(
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class NodeStatus(StrEnum):
Fail = "fail"
Warn = "warn"
Skipped = "skipped"
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"

Expand All @@ -63,6 +64,7 @@ class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess


class TestStatus(StrEnum):
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/artifacts/schemas/run/v5/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import copy
import threading
from dataclasses import dataclass, field
Expand All @@ -17,6 +19,7 @@
get_artifact_schema_version,
schema_version,
)
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import (
BaseResult,
ExecutionResult,
Expand All @@ -34,6 +37,7 @@ class RunResult(NodeResult):
agate_table: Optional["agate.Table"] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
batch_results: Optional[BatchResults] = None

@property
def skipped(self):
Expand All @@ -51,6 +55,7 @@ def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]):
node=node,
adapter_response={},
failures=None,
batch_results=None,
)


Expand All @@ -67,6 +72,7 @@ class RunResultOutput(BaseResult):
compiled: Optional[bool]
compiled_code: Optional[str]
relation_name: Optional[str]
batch_results: Optional[BatchResults] = None


def process_run_result(result: RunResult) -> RunResultOutput:
Expand All @@ -82,6 +88,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
message=result.message,
adapter_response=result.adapter_response,
failures=result.failures,
batch_results=result.batch_results,
compiled=result.node.compiled if compiled else None, # type:ignore
compiled_code=result.node.compiled_code if compiled else None, # type:ignore
relation_name=result.node.relation_name if compiled else None, # type:ignore
Expand Down
29 changes: 21 additions & 8 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
from click.exceptions import Exit as ClickExit
from click.exceptions import NoSuchOption, UsageError

from dbt.adapters.factory import register_adapter
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.artifacts.schemas.run import RunExecutionResult
from dbt.cli import params as p
from dbt.cli import requires
from dbt.cli.exceptions import DbtInternalException, DbtUsageException
from dbt.cli.requires import setup_manifest
from dbt.contracts.graph.manifest import Manifest
from dbt.mp_context import get_mp_context
from dbt_common.events.base_types import EventMsg


Expand Down Expand Up @@ -354,6 +357,7 @@ def compile(ctx, **kwargs):
@p.select
@p.selector
@p.inline
@p.inline_direct
@p.target_path
@p.threads
@p.vars
Expand All @@ -362,17 +366,26 @@ def compile(ctx, **kwargs):
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def show(ctx, **kwargs):
"""Generates executable SQL for a named resource or inline query, runs that SQL, and returns a preview of the
results. Does not materialize anything to the warehouse."""
from dbt.task.show import ShowTask

task = ShowTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)
from dbt.task.show import ShowTask, ShowTaskDirect

if ctx.obj["flags"].inline_direct:
# Issue the inline query directly, with no templating. Does not require
# loading the manifest.
register_adapter(ctx.obj["runtime_config"], get_mp_context())
task = ShowTaskDirect(
ctx.obj["flags"],
ctx.obj["runtime_config"],
)
else:
setup_manifest(ctx)
task = ShowTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@
help="Pass SQL inline to dbt compile and show",
)

inline_direct = click.option(
"--inline-direct",
envvar=None,
help="Pass SQL inline to dbt show. Do not load the entire project or apply templating.",
)

# `--select` and `--models` are analogous for most commands except `dbt list` for legacy reasons.
# Most CLI arguments should use the combined `select` option that aliases `--models` to `--select`.
# However, if you need to split out these separators (like `dbt ls`), use the `models` and `raw_select` options instead.
Expand Down
47 changes: 25 additions & 22 deletions core/dbt/cli/requires.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,28 +324,7 @@ def wrapper(*args, **kwargs):
ctx = args[0]
assert isinstance(ctx, Context)

req_strs = ["profile", "project", "runtime_config"]
reqs = [ctx.obj.get(dep) for dep in req_strs]

if None in reqs:
raise DbtProjectError("profile, project, and runtime_config required for manifest")

runtime_config = ctx.obj["runtime_config"]

# if a manifest has already been set on the context, don't overwrite it
if ctx.obj.get("manifest") is None:
ctx.obj["manifest"] = parse_manifest(
runtime_config, write_perf_info, write, ctx.obj["flags"].write_json
)
else:
register_adapter(runtime_config, get_mp_context())
adapter = get_adapter(runtime_config)
adapter.set_macro_context_generator(generate_runtime_macro_context)
adapter.set_macro_resolver(ctx.obj["manifest"])
query_header_context = generate_query_header_context(
adapter.config, ctx.obj["manifest"]
)
adapter.connections.set_query_header(query_header_context)
setup_manifest(ctx, write=write, write_perf_info=write_perf_info)
return func(*args, **kwargs)

return update_wrapper(wrapper, func)
Expand All @@ -355,3 +334,27 @@ def wrapper(*args, **kwargs):
if len(args0) == 0:
return outer_wrapper
return outer_wrapper(args0[0])


def setup_manifest(ctx: Context, write: bool = True, write_perf_info: bool = False):
"""Load the manifest and add it to the context."""
req_strs = ["profile", "project", "runtime_config"]
reqs = [ctx.obj.get(dep) for dep in req_strs]

if None in reqs:
raise DbtProjectError("profile, project, and runtime_config required for manifest")

runtime_config = ctx.obj["runtime_config"]

# if a manifest has already been set on the context, don't overwrite it
if ctx.obj.get("manifest") is None:
ctx.obj["manifest"] = parse_manifest(
runtime_config, write_perf_info, write, ctx.obj["flags"].write_json
)
else:
register_adapter(runtime_config, get_mp_context())
adapter = get_adapter(runtime_config)
adapter.set_macro_context_generator(generate_runtime_macro_context) # type: ignore[arg-type]
adapter.set_macro_resolver(ctx.obj["manifest"])
query_header_context = generate_query_header_context(adapter.config, ctx.obj["manifest"]) # type: ignore[attr-defined]
adapter.connections.set_query_header(query_header_context)
3 changes: 3 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchType
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -442,6 +443,8 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batches: Optional[List[BatchType]] = None

@classmethod
def resource_class(cls) -> Type[ModelResource]:
return ModelResource
Expand Down
1 change: 1 addition & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,7 @@ message EndOfRunSummary {
int32 num_errors = 1;
int32 num_warnings = 2;
bool keyboard_interrupt = 3;
int32 num_partial_success = 4;
}

message EndOfRunSummaryMsg {
Expand Down
136 changes: 68 additions & 68 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,9 @@ def message(self) -> str:
if self.status == "error":
info = "ERROR creating"
status = red(self.status.upper())
elif "PARTIAL SUCCESS" in self.status:
info = "PARTIALLY created"
status = yellow(self.status.upper())
else:
info = "OK created"
status = green(self.status)
Expand Down Expand Up @@ -1860,10 +1863,16 @@ def code(self) -> str:
def message(self) -> str:
error_plural = pluralize(self.num_errors, "error")
warn_plural = pluralize(self.num_warnings, "warning")
partial_success_plural = pluralize(self.num_partial_success, "partial success")

if self.keyboard_interrupt:
message = yellow("Exited because of keyboard interrupt")
elif self.num_errors > 0:
message = red(f"Completed with {error_plural} and {warn_plural}:")
message = red(
f"Completed with {error_plural}, {partial_success_plural}, and {warn_plural}:"
)
elif self.num_partial_success > 0:
message = yellow(f"Completed with {partial_success_plural} and {warn_plural}")
elif self.num_warnings > 0:
message = yellow(f"Completed with {warn_plural}:")
else:
Expand Down
7 changes: 4 additions & 3 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from typing import List, Optional

import pytz

from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.batch_results import BatchType
from dbt.contracts.graph.nodes import ModelNode, NodeConfig
from dbt.exceptions import DbtInternalError, DbtRuntimeError

Expand Down Expand Up @@ -68,7 +69,7 @@ def build_start_time(self, checkpoint: Optional[datetime]):

return start

def build_batches(self, start: datetime, end: datetime) -> List[Tuple[datetime, datetime]]:
def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:
"""
Given a start and end datetime, builds a list of batches where each batch is
the size of the model's batch_size.
Expand All @@ -79,7 +80,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[Tuple[datetime,
curr_batch_start, batch_size, 1
)

batches: List[Tuple[datetime, datetime]] = [(curr_batch_start, curr_batch_end)]
batches: List[BatchType] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
Expand Down
33 changes: 19 additions & 14 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,10 +1150,17 @@ def process_saved_queries(self, config: RuntimeConfig):

def process_model_inferred_primary_keys(self):
"""Processes Model nodes to populate their `primary_key`."""
model_to_generic_test_map: Dict[str, List[GenericTestNode]] = {}
for node in self.manifest.nodes.values():
if not isinstance(node, ModelNode):
continue
generic_tests = self._get_generic_tests_for_model(node)
if node.created_at < self.started_at:
continue
if not model_to_generic_test_map:
model_to_generic_test_map = self.build_model_to_generic_tests_map()
generic_tests: List[GenericTestNode] = []
if node.unique_id in model_to_generic_test_map:
generic_tests = model_to_generic_test_map[node.unique_id]
primary_key = node.infer_primary_key(generic_tests)
node.primary_key = sorted(primary_key)

Expand Down Expand Up @@ -1425,23 +1432,21 @@ def write_perf_info(self, target_path: str):
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
fire_event(ParsePerfInfoPath(path=path))

def _get_generic_tests_for_model(
self,
model: ModelNode,
) -> List[GenericTestNode]:
def build_model_to_generic_tests_map(self) -> Dict[str, List[GenericTestNode]]:
"""Return a list of generic tests that are attached to the given model, including disabled tests"""
tests = []
model_to_generic_tests_map: Dict[str, List[GenericTestNode]] = {}
for _, node in self.manifest.nodes.items():
if isinstance(node, GenericTestNode) and node.attached_node == model.unique_id:
tests.append(node)
if isinstance(node, GenericTestNode) and node.attached_node:
if node.attached_node not in model_to_generic_tests_map:
model_to_generic_tests_map[node.attached_node] = []
model_to_generic_tests_map[node.attached_node].append(node)
for _, nodes in self.manifest.disabled.items():
for disabled_node in nodes:
if (
isinstance(disabled_node, GenericTestNode)
and disabled_node.attached_node == model.unique_id
):
tests.append(disabled_node)
return tests
if isinstance(disabled_node, GenericTestNode) and disabled_node.attached_node:
if disabled_node.attached_node not in model_to_generic_tests_map:
model_to_generic_tests_map[disabled_node.attached_node] = []
model_to_generic_tests_map[disabled_node.attached_node].append(disabled_node)
return model_to_generic_tests_map


def invalid_target_fail_unless_test(
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _build_run_result(
agate_table=None,
adapter_response=None,
failures=None,
batch_results=None,
):
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
Expand All @@ -242,6 +243,7 @@ def _build_run_result(
agate_table=agate_table,
adapter_response=adapter_response,
failures=failures,
batch_results=batch_results,
)

def error_result(self, node, message, start_time, timing_info):
Expand Down Expand Up @@ -272,6 +274,7 @@ def from_run_result(self, result, start_time, timing_info):
agate_table=result.agate_table,
adapter_response=result.adapter_response,
failures=result.failures,
batch_results=result.batch_results,
)

def compile_and_execute(self, manifest: Manifest, ctx: ExecutionContext):
Expand Down
Loading

0 comments on commit e924bdc

Please sign in to comment.