diff --git a/airflow_dbt_python/__version__.py b/airflow_dbt_python/__version__.py index 9b2818a..be0a9c7 100644 --- a/airflow_dbt_python/__version__.py +++ b/airflow_dbt_python/__version__.py @@ -2,4 +2,4 @@ __author__ = "Tomás Farías Santana" __copyright__ = "Copyright 2021 Tomás Farías Santana" __title__ = "airflow-dbt-python" -__version__ = "0.8.0" +__version__ = "0.8.1" diff --git a/airflow_dbt_python/operators/dbt.py b/airflow_dbt_python/operators/dbt.py index 36170fb..6f79bcd 100644 --- a/airflow_dbt_python/operators/dbt.py +++ b/airflow_dbt_python/operators/dbt.py @@ -219,6 +219,14 @@ def dbt_directory(self) -> Iterator[str]: with TemporaryDirectory(prefix="airflowtmp") as tmp_dir: self.prepare_directory(tmp_dir) + + if getattr(self, "state", None) is not None: + state = Path(getattr(self, "state", "")) + # Since we are running in a temporary directory, we need to make + # state paths relative to this temporary directory. + if not state.is_absolute(): + setattr(self, "state", str(Path(tmp_dir) / state)) + yield tmp_dir self.profiles_dir = store_profiles_dir diff --git a/pyproject.toml b/pyproject.toml index 47f2c1e..1eea650 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "airflow-dbt-python" -version = "0.8.0" +version = "0.8.1" description = "A dbt operator for Airflow that uses the dbt Python package" authors = ["Tomás Farías Santana "] license = "MIT" diff --git a/tests/test_dbt_base.py b/tests/test_dbt_base.py index 52410d4..ef3f474 100644 --- a/tests/test_dbt_base.py +++ b/tests/test_dbt_base.py @@ -125,25 +125,59 @@ def test_prepare_args_with_positional(): @no_s3_hook def test_dbt_base_dbt_directory(): - """Test dbt_directory yields a temporary directory""" + """Test dbt_directory yields a temporary directory.""" op = DbtBaseOperator( task_id="dbt_task", project_dir="/path/to/project/", profiles_dir="/path/to/profiles/", ) + op.state = "target/" + with op.dbt_directory() as tmp_dir: assert Path(tmp_dir).exists() assert op.project_dir == "/path/to/project/" assert op.profiles_dir == "/path/to/profiles/" + assert op.state == f"{tmp_dir}/target" + + +@no_s3_hook +def test_dbt_base_dbt_directory_with_absolute_state(): + """Test dbt_directory does not alter state when not needed.""" + op = DbtBaseOperator( + task_id="dbt_task", + project_dir="/path/to/project/", + profiles_dir="/path/to/profiles/", + ) + op.state = "/absolute/path/to/target/" + + with op.dbt_directory() as tmp_dir: + assert Path(tmp_dir).exists() + assert op.state == "/absolute/path/to/target/" + + +@no_s3_hook +def test_dbt_base_dbt_directory_with_no_state(): + """Test dbt_directory does not alter state when not needed.""" + op = DbtBaseOperator( + task_id="dbt_task", + project_dir="/path/to/project/", + profiles_dir="/path/to/profiles/", + ) + + with op.dbt_directory() as tmp_dir: + assert Path(tmp_dir).exists() + assert getattr(op, "state", None) is None @no_s3_hook def test_dbt_base_dbt_directory_changed_to_s3( dbt_project_file, profiles_file, s3_bucket ): - """ - Test dbt_directory yields a temporary directory and updates profile_dir and - profiles_dir when files have been pulled from s3 + """Test dbt_directory yields a temporary directory and updates attributes. + + Certain attributes, like project_dir, profiles_dir, and state, need to be updated to + work once a temporary directory has been created, in particular, when pulling from + S3. """ hook = DbtS3Hook() bucket = hook.get_bucket(s3_bucket) @@ -161,11 +195,16 @@ def test_dbt_base_dbt_directory_changed_to_s3( project_dir=f"s3://{s3_bucket}/dbt/project/", profiles_dir=f"s3://{s3_bucket}/dbt/profiles/", ) + op.state = "target/" + with op.dbt_directory() as tmp_dir: assert Path(tmp_dir).exists() assert Path(tmp_dir).is_dir() + assert op.project_dir == f"{tmp_dir}/" assert op.profiles_dir == f"{tmp_dir}/" + assert op.state == f"{tmp_dir}/target" + assert Path(f"{tmp_dir}/profiles.yml").exists() assert Path(f"{tmp_dir}/profiles.yml").is_file() assert Path(f"{tmp_dir}/dbt_project.yml").exists()