Skip to content

Commit

Permalink
fix MicrobatchExecutionDebug message
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Nov 28, 2024
1 parent 1b7d9b5 commit 902fea5
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,23 @@ def set_relation_exists(self, relation_exists: bool) -> None:
def set_batches(self, batches: Dict[int, BatchType]) -> None:
self.batches = batches

@property
def batch_start(self) -> Optional[datetime]:
if self.batch_idx is None:
return None

Check warning on line 380 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L380

Added line #L380 was not covered by tests
else:
return self.batches[self.batch_idx][0]

def describe_node(self) -> str:
return f"{self.node.language} microbatch model {self.get_node_representation()}"

def describe_batch(self, batch_start: datetime) -> str:
def describe_batch(self) -> str:
if self.batch_idx is None:
return ""

Check warning on line 389 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L389

Added line #L389 was not covered by tests

# Only visualize date if batch_start year/month/day
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
self.batch_start, self.node.config.batch_size
)
return f"batch {formatted_batch_start} of {self.get_node_representation()}"

Expand All @@ -391,8 +401,7 @@ def print_batch_result_line(
if self.batch_idx is None:
return

batch_start = self.batches[self.batch_idx][0]
description = self.describe_batch(batch_start)
description = self.describe_batch()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
Expand Down Expand Up @@ -421,7 +430,7 @@ def print_batch_start_line(self) -> None:
if batch_start is None:
return

batch_description = self.describe_batch(batch_start)
batch_description = self.describe_batch()
fire_event(
LogStartLine(
description=batch_description,
Expand Down Expand Up @@ -724,14 +733,14 @@ def handle_microbatch_model(
if runner._should_run_in_parallel(relation_exists):
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run concurrently"
msg=f"{batch_runner.describe_batch()} is being run concurrently"
)
)
self._submit(pool, [batch_runner], batch_results.append)
else:
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run sequentially"
msg=f"{batch_runner.describe_batch()} is being run sequentially"
)
)
batch_results.append(self.call_runner(batch_runner))
Expand Down

0 comments on commit 902fea5

Please sign in to comment.