Skip to content

Commit

Permalink
feat: Ensure compatibility with dbt>=1.4,<1.8 (#127)
Browse files Browse the repository at this point in the history
* fix: Check for tasks without a profile without UnsetProfileConfig

* chore: Version bumps in lockfile and package

* fix: Address incompatibilities with dbt-core 1.5

* fix: Address incompatibilities with dbt-core 1.5

* Ensure compatibility with dbt>=1.4,<1.8

Dropped support for Python 3.7, dbt <1.4, Airflow <2.2

* Exclude broken dulwich 0.21.6

jelmer/dulwich#1208

* Fix CI

- fix airflow version for exception
- add github.com:80 to the list of allowed endpoints

* Add airflow version for matrix exception

* chore(ci): Drop requirement of secret gh token

I want to see if this is required.

---------

Co-authored-by: Tomás Farías Santana <[email protected]>
  • Loading branch information
millin and tomasfarias authored Dec 2, 2023
1 parent 2ebf062 commit d4acf2c
Show file tree
Hide file tree
Showing 26 changed files with 3,984 additions and 2,992 deletions.
70 changes: 52 additions & 18 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,72 @@ on:

jobs:
test:
name: Test on Python ${{ matrix.python-version }} and Airflow ${{ matrix.airflow-version }}
name: Test on Python ${{ matrix.python-version }} and Airflow ${{ matrix.airflow-version }} and dbt ${{ matrix.dbt-version }}
strategy:
fail-fast: false
matrix:
python-version: [3.7, 3.8, 3.9, '3.10', '3.11']
airflow-version: ['2.5.1', '2.4.3']
python-version:
- '3.11'
- '3.10'
- '3.9'
- '3.8'
airflow-version:
- '2.7.2'
- '2.6.3'
- '2.5.3'
- '2.4.3'
dbt-version:
- 1.7
- 1.6
- 1.5
- 1.4
exclude:
# No constraints available
- python-version: '3.11'
# Incompatible combinations
- python-version: 3.11
airflow-version: '2.4.3'

- python-version: 3.11
airflow-version: '2.5.3'

runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@v1
uses: step-security/harden-runner@v2.6.1
with:
egress-policy: block
allowed-endpoints: >
api.github.com:443
files.pythonhosted.org:443
hub.getdbt.com:443
github.com:80
github.com:443
gitlab.com:80
gitlab.com:443
objects.githubusercontent.com:443
raw.githubusercontent.com:443
pypi.org:443
- uses: actions/checkout@v3
archive.ubuntu.com:80
azure.archive.ubuntu.com:80
esm.ubuntu.com:443
motd.ubuntu.com:443
packages.microsoft.com:80
ppa.launchpadcontent.net:443
security.ubuntu.com:80
- run: |
sudo apt-get update
sudo apt-get install --yes --no-install-recommends postgresql
- uses: actions/[email protected]
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v4.7.1
with:
python-version: ${{ matrix.python-version }}

- name: Install Poetry
uses: abatilo/actions-poetry@v2.1.4
uses: abatilo/actions-poetry@v2.3.0
with:
poetry-version: 1.3.2
poetry-version: 1.7.0

- name: Install airflow-dbt-python with Poetry
run: poetry install -E postgres --with dev
Expand All @@ -61,12 +91,16 @@ jobs:
run: |
wget https://raw.githubusercontent.com/apache/airflow/constraints-${{ matrix.airflow-version }}/constraints-${{ matrix.python-version }}.txt -O constraints.txt
poetry run pip install apache-airflow==${{ matrix.airflow-version }} apache-airflow-providers-amazon apache-airflow-providers-ssh -c constraints.txt
poetry run pip install "dbt-core~=${{ matrix.dbt-version }}.0" "dbt-postgres~=${{ matrix.dbt-version }}.0"
poetry run airflow db init
- name: Linting with ruff
run: poetry run ruff .

- name: Static type checking with mypy
# We only run mypy on the latest supported versions of Airflow & dbt,
# so it is currently impossible to write conditions for that depend on package versions.
if: matrix.airflow-version == '2.7.2' && matrix.dbt-version == '1.7'
run: poetry run mypy .

- name: Code formatting with black
Expand Down Expand Up @@ -97,7 +131,7 @@ jobs:

steps:
- name: Harden Runner
uses: step-security/harden-runner@v1
uses: step-security/harden-runner@v2.6.0
with:
egress-policy: block
allowed-endpoints: >
Expand All @@ -106,18 +140,18 @@ jobs:
api.github.com:443
pypi.org:443
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4.1.1
- uses: actions/setup-python@v4.7.1
with:
python-version: '3.10'
python-version: '3.11'

- name: Install Poetry
uses: abatilo/actions-poetry@v2.1.4
uses: abatilo/actions-poetry@v2.3.0
with:
poetry-version: 1.3.2
poetry-version: 1.7.0

- name: Install airflow-dbt-python with Poetry
run: poetry install --with dev -E airflow -E airflow-providers
run: poetry install --with dev -E airflow-providers

- name: Download coverage data.
uses: actions/download-artifact@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docs_pages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
poetry-version: 1.3.2

- name: Install airflow-dbt-python with Poetry
run: poetry install -E airflow -E airflow-providers --with docs
run: poetry install -E airflow-providers --with docs

- name: Install Graphviz
run: sudo apt-get install graphviz
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ dbt.log
models/
data/
dbt_packages/
webserver_config.py
.env/
.idea/
2 changes: 1 addition & 1 deletion .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ build:
post_install:
- pip install poetry==1.3.2
- poetry config virtualenvs.create false
- poetry install -E airflow -E git -E s3 --with docs
- poetry install -E git -E s3 --with docs
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ Read the [documentation](https://airflow-dbt-python.readthedocs.io) for examples
## Requirements

Before using *airflow-dbt-python*, ensure you meet the following requirements:
* A *dbt* project using [dbt-core](https://pypi.org/project/dbt-core/) version 1.0.0 or later.
* A *dbt* project using [dbt-core](https://pypi.org/project/dbt-core/) version 1.4.0 or later.
* An Airflow environment using version 2.2 or later.

* If using any managed service, like AWS MWAA, ensure your environment is created with a supported version of Airflow.
* If self-hosting, Airflow installation instructions can be found in their [official documentation](https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html).

* Running Python 3.7 or later in your Airflow environment.
* Running Python 3.8 or later in your Airflow environment.

> **Warning**
>
Expand Down
2 changes: 1 addition & 1 deletion airflow_dbt_python/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
__author__ = "Tomás Farías Santana"
__copyright__ = "Copyright 2021 Tomás Farías Santana"
__title__ = "airflow-dbt-python"
__version__ = "1.0.5"
__version__ = "1.1.0"
120 changes: 78 additions & 42 deletions airflow_dbt_python/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection

from airflow_dbt_python.utils.version import DBT_INSTALLED_LESS_THAN_1_5

if sys.version_info >= (3, 11):
from contextlib import chdir as chdir_ctx
else:
from contextlib_chdir import chdir as chdir_ctx


if TYPE_CHECKING:
from dbt.contracts.results import RunResult
from dbt.task.base import BaseTask
Expand Down Expand Up @@ -208,6 +216,7 @@ def run_dbt_task(
replace_on_upload: bool = False,
artifacts: Optional[Iterable[str]] = None,
env_vars: Optional[Dict[str, Any]] = None,
write_perf_info: bool = False,
**kwargs,
) -> DbtTaskResult:
"""Run a dbt task with a given configuration and return the results.
Expand All @@ -219,9 +228,17 @@ def run_dbt_task(
of running the dbt command.
"""
from dbt.adapters.factory import register_adapter
from dbt.config.runtime import UnsetProfileConfig
from dbt.main import adapter_management, track_run
from dbt.task.base import move_to_nearest_project_dir
from dbt.task.base import get_nearest_project_dir
from dbt.task.clean import CleanTask
from dbt.task.deps import DepsTask

from airflow_dbt_python.utils.version import DBT_INSTALLED_LESS_THAN_1_5

if DBT_INSTALLED_LESS_THAN_1_5:
from dbt.main import adapter_management, track_run # type: ignore
else:
from dbt.adapters.factory import adapter_management
from dbt.tracking import track_run

config = self.get_dbt_task_config(command, **kwargs)
extra_target = self.get_dbt_target_from_connection(config.target)
Expand All @@ -233,53 +250,62 @@ def run_dbt_task(
replace_on_upload=replace_on_upload,
env_vars=env_vars,
) as dbt_dir:
config.dbt_task.pre_init_hook(config)
self.ensure_profiles(config.profiles_dir)

task, runtime_config = config.create_dbt_task(extra_target)

# When creating tasks via from_args, dbt switches to the project directory.
# We have to do that here as we are not using from_args.
move_to_nearest_project_dir(config)
if DBT_INSTALLED_LESS_THAN_1_5:
# For compatibility with older versions of dbt, as the signature
# of move_to_nearest_project_dir changed in dbt-core 1.5 to take
# just the project_dir.
nearest_project_dir = get_nearest_project_dir(config) # type: ignore
else:
nearest_project_dir = get_nearest_project_dir(config.project_dir)

with chdir_ctx(nearest_project_dir):
config.dbt_task.pre_init_hook(config)
self.ensure_profiles(config)

self.setup_dbt_logging(task, config.debug)
task, runtime_config = config.create_dbt_task(
extra_target, write_perf_info
)
requires_profile = isinstance(task, (CleanTask, DepsTask))

if not isinstance(runtime_config, UnsetProfileConfig):
if runtime_config is not None:
self.setup_dbt_logging(task, config.debug)

if runtime_config is not None and not requires_profile:
# The deps command installs the dependencies, which means they may
# not exist before deps runs and the following would raise a
# CompilationError.
runtime_config.load_dependencies()

results = None
with adapter_management():
if not isinstance(runtime_config, UnsetProfileConfig):
if runtime_config is not None:
register_adapter(runtime_config)
results = None
with adapter_management():
if not requires_profile:
if runtime_config is not None:
register_adapter(runtime_config)

with track_run(task):
results = task.run()
success = task.interpret_results(results)
with track_run(task):
results = task.run()
success = task.interpret_results(results)

if artifacts is None:
return DbtTaskResult(success, results, {})
if artifacts is None:
return DbtTaskResult(success, results, {})

saved_artifacts = {}
for artifact in artifacts:
artifact_path = Path(dbt_dir) / "target" / artifact
saved_artifacts = {}
for artifact in artifacts:
artifact_path = Path(dbt_dir) / "target" / artifact

if not artifact_path.exists():
self.log.warn(
"Required dbt artifact %s was not found. "
"Perhaps dbt failed and couldn't generate it.",
artifact,
)
continue
if not artifact_path.exists():
self.log.warning(
"Required dbt artifact %s was not found. "
"Perhaps dbt failed and couldn't generate it.",
artifact,
)
continue

with open(artifact_path) as artifact_file:
json_artifact = json.load(artifact_file)
with open(artifact_path) as artifact_file:
json_artifact = json.load(artifact_file)

saved_artifacts[artifact] = json_artifact
saved_artifacts[artifact] = json_artifact

return DbtTaskResult(success, results, saved_artifacts)

Expand Down Expand Up @@ -373,7 +399,7 @@ def prepare_directory(
if (project_dir_path / "profiles.yml").exists():
# We may have downloaded the profiles.yml file together
# with the project.
return (new_project_dir, new_project_dir)
return new_project_dir, new_project_dir

if profiles_dir is not None:
profiles_file_path = self.download_dbt_profiles(
Expand All @@ -384,7 +410,7 @@ def prepare_directory(
else:
new_profiles_dir = None

return (new_project_dir, new_profiles_dir)
return new_project_dir, new_profiles_dir

def setup_dbt_logging(self, task: BaseTask, debug: Optional[bool]):
"""Setup dbt logging.
Expand All @@ -399,10 +425,18 @@ def setup_dbt_logging(self, task: BaseTask, debug: Optional[bool]):
if task.config is not None:
log_path = getattr(task.config, "log_path", None)

setup_event_logger(log_path or "logs")
if DBT_INSTALLED_LESS_THAN_1_5:
setup_event_logger(log_path or "logs")
else:
from dbt.flags import get_flags

flags = get_flags()
setup_event_logger(flags)

configured_file = logging.getLogger("configured_file")
file_log = logging.getLogger("file_log")
stdout_log = logging.getLogger("stdout_log")
stdout_log.propagate = True

if not debug:
# We have to do this after setting logs up as dbt hasn't
Expand All @@ -413,16 +447,18 @@ def setup_dbt_logging(self, task: BaseTask, debug: Optional[bool]):
configured_file.setLevel("INFO")
configured_file.propagate = False

def ensure_profiles(self, profiles_dir: Optional[str]):
def ensure_profiles(self, config: BaseConfig):
"""Ensure a profiles file exists."""
if profiles_dir is not None:
# We expect one to exist given that we have passsed a profiles_dir.
if config.profiles_dir is not None:
# We expect one to exist given that we have passed a profiles_dir.
return

profiles_path = Path.home() / ".dbt/profiles.yml"
config.profiles_dir = str(profiles_path.parent)
if not profiles_path.exists():
profiles_path.parent.mkdir(exist_ok=True)
profiles_path.touch()
with profiles_path.open("w", encoding="utf-8") as f:
f.write("config:\n send_anonymous_usage_stats: false\n")

def get_dbt_target_from_connection(
self, target: Optional[str]
Expand Down
2 changes: 1 addition & 1 deletion airflow_dbt_python/hooks/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,4 @@ def get_git_client_path(self, url: URL) -> Tuple[GitClients, str]:
else:
raise ValueError(f"Unsupported scheme: {url.scheme}")

return (client, path)
return client, path
Loading

0 comments on commit d4acf2c

Please sign in to comment.