Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename internal batch_info variable to previous_batch_results #11056

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,15 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batch_info: Optional[BatchResults] = None
previous_batch_results: Optional[BatchResults] = None
_has_this: Optional[bool] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_has_this" in dct:
del dct["_has_this"]
if "previous_batch_results" in dct:
del dct["previous_batch_results"]
return dct

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def run(self):
# batch info if there were _no_ successful batches previously. This is
# because passing the batch info _forces_ the microbatch process into
# _incremental_ model, and it may be that we need to be in full refresh
# mode which is only handled if batch_info _isn't_ passed for a node
# mode which is only handled if previous_batch_results _isn't_ passed for a node
batch_map = {
result.unique_id: result.batch_results
for result in self.previous_results.results
Expand Down
12 changes: 6 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@
result.batch_results.failed = sorted(result.batch_results.failed)

# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
if self.node.batch_info is not None:
result.batch_results.successful += self.node.batch_info.successful
if self.node.previous_batch_results is not None:
result.batch_results.successful += self.node.previous_batch_results.successful

Check warning on line 449 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L448-L449

Added lines #L448 - L449 were not covered by tests

def _build_succesful_run_batch_result(
self,
Expand Down Expand Up @@ -508,15 +508,15 @@
)

if self.batch_idx is None:
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
if model.previous_batch_results is None:

Check warning on line 514 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L514

Added line #L514 was not covered by tests
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batch_info.failed
batches = model.previous_batch_results.failed

Check warning on line 519 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L519

Added line #L519 was not covered by tests
# If there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
Expand Down Expand Up @@ -885,7 +885,7 @@
if uid in self.batch_map:
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
if isinstance(node, ModelNode):
node.batch_info = self.batch_map[uid]
node.previous_batch_results = self.batch_map[uid]

Check warning on line 888 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L888

Added line #L888 was not covered by tests

def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"deprecation_date",
"defer_relation",
"time_spine",
"batch_info",
"previous_batch_results",
}
)

Expand Down
Loading