Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure run results artifact get written during "after run hooks" #10941

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241029-182034.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Handle exceptions in `get_execution_status` more broadly to better ensure `run_results.json`
gets written
time: 2024-10-29T18:20:34.782845-05:00
custom:
Author: QMalcolm
Issue: "10934"
28 changes: 24 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = RunStatus.Success
message = response._message
except (KeyboardInterrupt, SystemExit):
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking my own understanding -- these are eventually handled further along, here: https://github.com/dbt-labs/dbt-core/pull/10941/files#diff-299afe4cac0de7af89561129dd7c55a07bd53c06baafff397b51b460bc631bceR804?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct!

except DbtRuntimeError as exc:
status = RunStatus.Error
message = exc.msg
finally:
return status, message
except Exception as exc:
status = RunStatus.Error
message = str(exc)

return (status, message)


def _get_adapter_info(adapter, run_model_result) -> Dict[str, Any]:
Expand Down Expand Up @@ -792,8 +797,23 @@
], # exclude that didn't fail to preserve backwards compatibility
"database_schemas": list(database_schema_set),
}
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)

try:
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)
except (KeyboardInterrupt, SystemExit):
run_result = self.get_result(

Check warning on line 805 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L804-L805

Added lines #L804 - L805 were not covered by tests
results=self.node_results,
elapsed_time=time.time() - self.started_at,
generated_at=datetime.utcnow(),
)

if self.args.write_json and hasattr(run_result, "write"):
run_result.write(self.result_path())

Check warning on line 812 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L811-L812

Added lines #L811 - L812 were not covered by tests

print_run_end_messages(self.node_results, keyboard_interrupt=True)

Check warning on line 814 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L814

Added line #L814 was not covered by tests

raise

Check warning on line 816 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L816

Added line #L816 was not covered by tests

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
Expand Down
94 changes: 92 additions & 2 deletions tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from importlib import import_module
from typing import Optional
from typing import Optional, Type, Union
from unittest import mock
from unittest.mock import MagicMock, patch

import pytest
from psycopg2 import DatabaseError
from pytest_mock import MockerFixture

from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.artifacts.resources.types import NodeType, RunHookType
from dbt.artifacts.resources.v1.components import DependsOn
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.resources.v1.model import ModelConfig
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.graph.nodes import HookNode, ModelNode
from dbt.events.types import LogModelResult
from dbt.exceptions import DbtRuntimeError
from dbt.flags import get_flags, set_from_args
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
Expand Down Expand Up @@ -256,3 +264,85 @@ class Relation:

# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation


class TestRunTask:
@pytest.fixture
def hook_node(self) -> HookNode:
return HookNode(
package_name="test",
path="/root/x/path.sql",
original_file_path="/root/path.sql",
language="sql",
raw_code="select * from wherever",
name="foo",
resource_type=NodeType.Operation,
unique_id="model.test.foo",
fqn=["test", "models", "foo"],
refs=[],
sources=[],
metrics=[],
depends_on=DependsOn(),
description="",
database="test_db",
schema="test_schema",
alias="bar",
tags=[],
config=NodeConfig(),
index=None,
checksum=FileHash.from_contents(""),
unrendered_config={},
)

@pytest.mark.parametrize(
"error_to_raise,expected_result",
[
(None, RunStatus.Success),
(DbtRuntimeError, RunStatus.Error),
(DatabaseError, RunStatus.Error),
(KeyboardInterrupt, KeyboardInterrupt),
],
)
def test_safe_run_hooks(
self,
mocker: MockerFixture,
runtime_config: RuntimeConfig,
manifest: Manifest,
hook_node: HookNode,
error_to_raise: Optional[Type[Exception]],
expected_result: Union[RunStatus, Type[Exception]],
):
mocker.patch("dbt.task.run.RunTask.get_hooks_by_type").return_value = [hook_node]
mocker.patch("dbt.task.run.RunTask.get_hook_sql").return_value = hook_node.raw_code

flags = mock.Mock()
flags.state = None
flags.defer_state = None

run_task = RunTask(
args=flags,
config=runtime_config,
manifest=manifest,
)

adapter = mock.Mock()
adapter_execute = mock.Mock()
adapter_execute.return_value = (AdapterResponse(_message="Success"), None)

if error_to_raise:
adapter_execute.side_effect = error_to_raise("Oh no!")

adapter.execute = adapter_execute

try:
result = run_task.safe_run_hooks(
adapter=adapter,
hook_type=RunHookType.End,
extra_context={},
)
assert isinstance(expected_result, RunStatus)
assert result == expected_result
except BaseException as e:
assert not isinstance(expected_result, RunStatus)
assert issubclass(expected_result, BaseException)
assert type(e) == expected_result
Loading