Skip to content

Commit

Permalink
Put compiled files under dag_id folder & refactor few snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Sep 29, 2024
1 parent fc93e31 commit b57c4ed
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
43 changes: 28 additions & 15 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,30 @@ def generate_task_or_group(
return task_or_group


def _add_dbt_compile_task(
nodes: dict[str, DbtNode],
dag: DAG,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
tasks_map: dict[str, Any],
) -> None:
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
return

compile_task_metadata = TaskMetadata(

Check warning on line 266 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L266

Added line #L266 was not covered by tests
id=dbt_compile_task_id,
operator_class=f"cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=None)
tasks_map[dbt_compile_task_id] = compile_airflow_task

Check warning on line 273 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L272-L273

Added lines #L272 - L273 were not covered by tests

for node_id, node in nodes.items():
if not node.depends_on and node_id in tasks_map:
tasks_map[dbt_compile_task_id] >> tasks_map[node_id]

Check warning on line 277 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L275-L277

Added lines #L275 - L277 were not covered by tests


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand Down Expand Up @@ -333,32 +357,21 @@ def build_airflow_graph(
for leaf_node_id in leaves_ids:
tasks_map[leaf_node_id] >> test_task

if execution_mode == ExecutionMode.AIRFLOW_ASYNC:
compile_task_metadata = TaskMetadata(
id=dbt_compile_task_id,
owner="", # Set appropriate owner if needed
operator_class=f"cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=None)
tasks_map[dbt_compile_task_id] = compile_airflow_task
_add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map)

create_airflow_task_dependencies(nodes, tasks_map, execution_mode)
create_airflow_task_dependencies(nodes, tasks_map)


def create_airflow_task_dependencies(
nodes: dict[str, DbtNode], tasks_map: dict[str, Union[TaskGroup, BaseOperator]], execution_mode: ExecutionMode
nodes: dict[str, DbtNode],
tasks_map: dict[str, Union[TaskGroup, BaseOperator]],
) -> None:
"""
Create the Airflow task dependencies between non-test nodes.
:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:param tasks_map: Dictionary mapping dbt nodes (node.unique_id to Airflow task)
"""
for node_id, node in nodes.items():
if not node.depends_on and execution_mode == ExecutionMode.AIRFLOW_ASYNC:
tasks_map[dbt_compile_task_id] >> tasks_map[node_id]

for parent_node_id in node.depends_on:
# depending on the node type, it will not have mapped 1:1 to tasks_map
if (node_id in tasks_map) and (parent_node_id in tasks_map):
Expand Down
5 changes: 0 additions & 5 deletions cosmos/operators/airflow_async.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Any

Check warning on line 1 in cosmos/operators/airflow_async.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/airflow_async.py#L1

Added line #L1 was not covered by tests

from airflow.utils.context import Context

from cosmos.operators.base import DbtCompileMixin
from cosmos.operators.local import (

Check warning on line 4 in cosmos/operators/airflow_async.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/airflow_async.py#L3-L4

Added lines #L3 - L4 were not covered by tests
DbtBuildLocalOperator,
Expand Down Expand Up @@ -86,6 +84,3 @@ class DbtCompileAirflowAsyncOperator(DbtCompileMixin, DbtLocalBaseOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["should_upload_compiled_sql"] = True
super().__init__(*args, **kwargs)

Check warning on line 86 in cosmos/operators/airflow_async.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/airflow_async.py#L84-L86

Added lines #L84 - L86 were not covered by tests

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
23 changes: 16 additions & 7 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context, session: Se
self.log.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.")

@staticmethod
def _configure_remote_target_path() -> Path | None:
def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
if not remote_target_path:
return None
return None, None

Check warning on line 261 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L260-L261

Added lines #L260 - L261 were not covered by tests

_configured_target_path = None

Check warning on line 263 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L263

Added line #L263 was not covered by tests

Expand All @@ -269,7 +269,7 @@ def _configure_remote_target_path() -> Path | None:
target_path_schema = target_path_str.split("://")[0]
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
return _configured_target_path
return None, None

Check warning on line 272 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L267-L272

Added lines #L267 - L272 were not covered by tests

if not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(

Check warning on line 275 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L274-L275

Added lines #L274 - L275 were not covered by tests
Expand All @@ -285,7 +285,7 @@ def _configure_remote_target_path() -> Path | None:
if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
_configured_target_path.mkdir(parents=True, exist_ok=True)

Check warning on line 286 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L285-L286

Added lines #L285 - L286 were not covered by tests

return _configured_target_path
return _configured_target_path, remote_conn_id

Check warning on line 288 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L288

Added line #L288 was not covered by tests

def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None:
"""
Expand All @@ -294,17 +294,26 @@ def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None:
if not self.should_upload_compiled_sql:
return

dest_target_dir = self._configure_remote_target_path()
dest_target_dir, dest_conn_id = self._configure_remote_target_path()
if not dest_target_dir:
raise CosmosValueError(

Check warning on line 299 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L297-L299

Added lines #L297 - L299 were not covered by tests
"You're trying to upload compiled SQL files, but the remote target path is not configured. "
)

from airflow.io.path import ObjectStoragePath

Check warning on line 303 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L303

Added line #L303 was not covered by tests

source_target_dir = ObjectStoragePath(Path(tmp_project_dir) / "target" / "compiled")
source_compiled_dir = Path(tmp_project_dir) / "target" / "compiled"
files = [str(file) for file in source_compiled_dir.rglob("*") if file.is_file()]

Check warning on line 306 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L305-L306

Added lines #L305 - L306 were not covered by tests

source_target_dir.copy(dest_target_dir, recursive=True) # type: ignore[arg-type]
for file_path in files:
rel_path = os.path.relpath(file_path, source_compiled_dir)

Check warning on line 309 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L308-L309

Added lines #L308 - L309 were not covered by tests

dest_path = ObjectStoragePath(

Check warning on line 311 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L311

Added line #L311 was not covered by tests
f"{str(dest_target_dir).rstrip('/')}/{context['dag'].dag_id}/{rel_path.lstrip('/')}",
conn_id=dest_conn_id,
)
ObjectStoragePath(file_path).copy(dest_path)
self.log.debug("Copied %s to %s", file_path, dest_path)

Check warning on line 316 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L315-L316

Added lines #L315 - L316 were not covered by tests

@provide_session
def store_freshness_json(self, tmp_project_dir: str, context: Context, session: Session = NEW_SESSION) -> None:
Expand Down

0 comments on commit b57c4ed

Please sign in to comment.