Skip to content

Commit

Permalink
Merge branch 'main' into custom-selector-upstream-model
Browse files Browse the repository at this point in the history
  • Loading branch information
tseruga authored Aug 2, 2023
2 parents 2588222 + dd553dc commit 8f6f778
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.0.3"
__version__ = "1.0.4"

from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ class DbtResourceType(Enum):
SNAPSHOT = "snapshot"
SEED = "seed"
TEST = "test"
SOURCE = "source"
2 changes: 1 addition & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def load_via_dbt_ls(self) -> None:
name=node_dict["name"],
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict["depends_on"].get("nodes", []),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.project.dir / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
Expand Down
13 changes: 13 additions & 0 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from typing import TYPE_CHECKING

from cosmos.exceptions import CosmosValueError

if TYPE_CHECKING:
from cosmos.dbt.graph import DbtNode

Expand Down Expand Up @@ -181,6 +183,17 @@ def select_nodes(
if not select and not exclude:
return nodes

# validates select and exclude filters
filters = [["select", select], ["exclude", exclude]]
for filter_type, filter in filters:
for filter_parameter in filter:
if filter_parameter.startswith(PATH_SELECTOR) or filter_parameter.startswith(TAG_SELECTOR):
continue
elif any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG]):
continue
else:
raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}")

subset_ids: set[str] = set()

for statement in select:
Expand Down
10 changes: 7 additions & 3 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path

import pytest

from cosmos.constants import DbtResourceType
from cosmos.dbt.graph import DbtNode
from cosmos.dbt.selector import select_nodes
from cosmos.exceptions import CosmosValueError

SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/")

Expand Down Expand Up @@ -106,9 +109,10 @@ def test_select_nodes_by_exclude_tag():
assert selected == expected


def test_select_nodes_by_exclude_unsupported_selector(caplog):
select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["unsupported:filter"])
assert "Unsupported select statement: unsupported:filter" in caplog.messages
def test_select_nodes_by_exclude_unsupported_selector():
with pytest.raises(CosmosValueError) as err_info:
assert select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["unsupported:filter"])
assert err_info.value.args[0] == "Invalid exclude filter: unsupported:filter"


def test_select_nodes_by_select_union_exclude_tags():
Expand Down

0 comments on commit 8f6f778

Please sign in to comment.