Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ExecutionMode.KUBERNETES on Cosmos 1.x #554

Merged
merged 4 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def build_airflow_graph(
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)
Expand Down
1 change: 0 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def __init__(
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}

if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path

Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator):
:param cache_selected_only:
:param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in
the dbt_project.yml file ('require-dbt-version')
:param emit_datasets: Enable emitting inlets and outlets during task execution
:param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build.
:param quiet: dbt optional argument to show only error logs in stdout
:param warn_error: dbt optional argument to convert dbt warnings into errors
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
selector: str | None = None,
vars: dict[str, str] | None = None,
models: str | None = None,
emit_datasets: bool = True,
cache_selected_only: bool = False,
no_version_check: bool = False,
fail_fast: bool = False,
Expand All @@ -112,6 +114,7 @@ def __init__(
self.selector = selector
self.vars = vars
self.models = models
self.emit_datasets = emit_datasets
self.cache_selected_only = cache_selected_only
self.no_version_check = no_version_check
self.fail_fast = fail_fast
Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])

if self.project_dir:
dbt_cmd.extend(["--project-dir", str(self.project_dir)])

# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
Expand Down
3 changes: 1 addition & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class DbtLocalBaseOperator(DbtBaseOperator):
:param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found
in your project's dbt_project.yml, "cosmos_profile" is used.
:param install_deps: If true, install dependencies before running the command
:param install_deps: If true, the operator will set inlets and outlets
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
Expand All @@ -99,15 +100,13 @@ def __init__(
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
should_store_compiled_sql: bool = True,
emit_datasets: bool = True,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.install_deps = install_deps
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
self.emit_datasets = emit_datasets
self.openlineage_events_completes: list[RunEvent] = []
super().__init__(**kwargs)

Expand Down
Binary file modified docs/_static/jaffle_shop_k8s_dag_run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 18 additions & 11 deletions docs/getting_started/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,27 @@ For instance,

.. code-block:: text

DbtTaskGroup(
...
run_models = DbtTaskGroup(
profile_config=ProfileConfig(
profile_name="postgres_profile",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="postgres_default",
profile_args={
"schema": "public",
},
),
),
project_config=ProjectConfig(PROJECT_DIR),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
operator_args={
"queue": "kubernetes",
"image": "dbt-jaffle-shop:1.0.0",
"image_pull_policy": "Always",
"image": DBT_IMAGE,
"get_logs": True,
"is_delete_operator_pod": False,
"namespace": "default",
"env_vars": {
...
},
"secrets": [postgres_password_secret, postgres_host_secret],
},
execution_mode="kubernetes",
)

Step-by-step instructions
Expand All @@ -53,7 +60,7 @@ Using installed `Kind <https://kind.sigs.k8s.io/>`_, you can setup a local kuber

.. code-block:: bash

kind cluster create
kind create cluster

Deploy a Postgres pod to Kind using `Helm <https://helm.sh/docs/helm/helm_install/>`_

Expand Down
4 changes: 4 additions & 0 deletions tests/operators/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def test_dbt_kubernetes_build_command():
"end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
"--project-dir",
"my/dir",
]


Expand Down Expand Up @@ -150,6 +152,8 @@ def test_created_pod(test_hook):
"data_interval_start.strftime(''%Y%m%d%H%M%S'') "
"}}'\n",
"--no-version-check",
"--project-dir",
"my/dir",
],
"command": [],
"env": [{"name": "FOO", "value": "BAR", "value_from": None}],
Expand Down
12 changes: 12 additions & 0 deletions tests/sample/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default:
target: dev
outputs:
dev:
type: postgres
host: "localhost"
user: "postgres"
password: "postgres"
port: 5432
dbname: "postgres"
schema: "public"
threads: 4
53 changes: 52 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from pathlib import Path

from unittest.mock import patch
import pytest

from cosmos.converter import DbtToAirflowConverter, validate_arguments
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.dbt.graph import DbtNode
from cosmos.exceptions import CosmosValueError


from cosmos.converter import validate_arguments
SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"


@pytest.mark.parametrize("argument_key", ["tags", "paths"])
Expand All @@ -16,3 +25,45 @@ def test_validate_arguments_tags(argument_key):
validate_arguments(select, exclude, profile_args, task_args)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected


parent_seed = DbtNode(
name="seed_parent",
unique_id="seed_parent",
resource_type=DbtResourceType.SEED,
depends_on=[],
file_path="",
)
nodes = {"seed_parent": parent_seed}


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
# (ExecutionMode.DOCKER, {"image": "sample-image"}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors.
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert converter
Loading