Skip to content

Commit

Permalink
fix: Update state when running from temporary directory
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 9, 2021
1 parent e63f89c commit e951979
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airflow_dbt_python/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
__author__ = "Tomás Farías Santana"
__copyright__ = "Copyright 2021 Tomás Farías Santana"
__title__ = "airflow-dbt-python"
__version__ = "0.8.0"
__version__ = "0.8.1"
8 changes: 8 additions & 0 deletions airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ def dbt_directory(self) -> Iterator[str]:

with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
self.prepare_directory(tmp_dir)

if getattr(self, "state", None) is not None:
state = Path(getattr(self, "state", ""))
# Since we are running in a temporary directory, we need to make
# state paths relative to this temporary directory.
if not state.is_absolute():
setattr(self, "state", str(Path(tmp_dir) / state))

yield tmp_dir

self.profiles_dir = store_profiles_dir
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airflow-dbt-python"
version = "0.8.0"
version = "0.8.1"
description = "A dbt operator for Airflow that uses the dbt Python package"
authors = ["Tomás Farías Santana <[email protected]>"]
license = "MIT"
Expand Down
47 changes: 43 additions & 4 deletions tests/test_dbt_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,59 @@ def test_prepare_args_with_positional():

@no_s3_hook
def test_dbt_base_dbt_directory():
"""Test dbt_directory yields a temporary directory"""
"""Test dbt_directory yields a temporary directory."""
op = DbtBaseOperator(
task_id="dbt_task",
project_dir="/path/to/project/",
profiles_dir="/path/to/profiles/",
)
op.state = "target/"

with op.dbt_directory() as tmp_dir:
assert Path(tmp_dir).exists()
assert op.project_dir == "/path/to/project/"
assert op.profiles_dir == "/path/to/profiles/"
assert op.state == f"{tmp_dir}/target"


@no_s3_hook
def test_dbt_base_dbt_directory_with_absolute_state():
"""Test dbt_directory does not alter state when not needed."""
op = DbtBaseOperator(
task_id="dbt_task",
project_dir="/path/to/project/",
profiles_dir="/path/to/profiles/",
)
op.state = "/absolute/path/to/target/"

with op.dbt_directory() as tmp_dir:
assert Path(tmp_dir).exists()
assert op.state == "/absolute/path/to/target/"


@no_s3_hook
def test_dbt_base_dbt_directory_with_no_state():
"""Test dbt_directory does not alter state when not needed."""
op = DbtBaseOperator(
task_id="dbt_task",
project_dir="/path/to/project/",
profiles_dir="/path/to/profiles/",
)

with op.dbt_directory() as tmp_dir:
assert Path(tmp_dir).exists()
assert getattr(op, "state", None) is None


@no_s3_hook
def test_dbt_base_dbt_directory_changed_to_s3(
dbt_project_file, profiles_file, s3_bucket
):
"""
Test dbt_directory yields a temporary directory and updates profile_dir and
profiles_dir when files have been pulled from s3
"""Test dbt_directory yields a temporary directory and updates attributes.
Certain attributes, like project_dir, profiles_dir, and state, need to be updated to
work once a temporary directory has been created, in particular, when pulling from
S3.
"""
hook = DbtS3Hook()
bucket = hook.get_bucket(s3_bucket)
Expand All @@ -161,11 +195,16 @@ def test_dbt_base_dbt_directory_changed_to_s3(
project_dir=f"s3://{s3_bucket}/dbt/project/",
profiles_dir=f"s3://{s3_bucket}/dbt/profiles/",
)
op.state = "target/"

with op.dbt_directory() as tmp_dir:
assert Path(tmp_dir).exists()
assert Path(tmp_dir).is_dir()

assert op.project_dir == f"{tmp_dir}/"
assert op.profiles_dir == f"{tmp_dir}/"
assert op.state == f"{tmp_dir}/target"

assert Path(f"{tmp_dir}/profiles.yml").exists()
assert Path(f"{tmp_dir}/profiles.yml").is_file()
assert Path(f"{tmp_dir}/dbt_project.yml").exists()
Expand Down

0 comments on commit e951979

Please sign in to comment.