Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tidy first] move microbatch compilation to .compile method #11063

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 27 additions & 44 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
Expand Down Expand Up @@ -328,9 +327,7 @@

hook_ctx = self.adapter.pre_model_hook(context_config)

return self._execute_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)


class MicrobatchModelRunner(ModelRunner):
Expand All @@ -342,10 +339,30 @@
self.relation_exists: bool = False

def compile(self, manifest: Manifest):
# The default compile function is _always_ called. However, we do our
# compilation _later_ in `_execute_microbatch_materialization`. This
# meant the node was being compiled _twice_ for each batch. To get around
# this, we've overriden the default compile method to do nothing
if self.batch_idx is not None:
batch = self.batches[self.batch_idx]

Check warning on line 343 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L342-L343

Added lines #L342 - L343 were not covered by tests

# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

Check warning on line 348 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L347-L348

Added lines #L347 - L348 were not covered by tests
# Create batch context on model node prior to re-compiling
self.node.batch = BatchContext(

Check warning on line 350 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L350

Added line #L350 was not covered by tests
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(

Check warning on line 356 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L356

Added line #L356 was not covered by tests
self.node,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], self.node.config.batch_size
),
)

# Skips compilation for non-batch runs
return self.node

def set_batch_idx(self, batch_idx: int) -> None:
Expand Down Expand Up @@ -502,7 +519,6 @@
def _execute_microbatch_materialization(
self,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
Expand Down Expand Up @@ -537,25 +553,6 @@
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
model.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
# Update jinja context with batch context members
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists
Expand Down Expand Up @@ -643,37 +640,23 @@
else:
return False

def _execute_microbatch_model(
def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
batch_result = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
model, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

return batch_result

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)


class RunTask(CompileTask):
def __init__(
Expand Down