Skip to content

Commit

Permalink
Add support to store and fetch dbt ls cache in remote stores (#1147)
Browse files Browse the repository at this point in the history
This PR introduces the functionality to store and retrieve the `dbt ls`
output cache in remote storage systems. This enhancement improves the
efficiency and scalability of cache management for Cosmos dbt projects
that use the `dbt ls` cache option (enabled by default) introduced in PR
#1014

## Key Changes
1. **`dbt ls` Cache Storage in Remote Stores**:
Added support to store the dbt ls cache as a JSON file in remote storage
paths configured in the Airflow settings under the `cosmos` section.
The cache is saved in the specified remote storage path & it includes
the `cosmos_cache__` prefix.
2. **Cache Retrieval from Remote Stores**:
Implemented logic to check the existence of the cache in the remote
storage path before falling back to the Variable cache.
If the `remote_cache_dir` is specified and it exists in the remote
store, it is read and used; We try creating the specified path if it
does not exist.
3. **Backward Compatibility**:
Maintained backward compatibility by allowing users to continue using
local cache storage through Airflow Variables if a `remote_cache_dir` is
not specified.

## Impact
1. **Scalability**: Enables the use of remote, scalable storage systems
for dbt cache management.
2. **Performance**: Reduces the load on Airflow's metadata database by
offloading cache storage to external systems.
3. **Flexibility**: Provides users with the option to choose between
local (Airflow metadata using Variables) and remote cache storage based
on their infrastructure needs.

## Configuration
To leverage this feature, users need to set the `remote_cache_dir` in
their Airflow settings in the `cosmos` section. This path should point
to a compatible remote storage location. You can also specify the
`remote_cache_dir_conn_id` which is your Airflow connection that can
connect to your remote store. If it's not specified, Cosmos will aim to
identify the scheme for the specified path and use the default Airflow
connection ID as per the scheme.

## Testing
1. Tested with various remote storage backends (AWS S3 and GCP GS) to
ensure compatibility and reliability
2. Verified that cache retrieval falls back to Variable based caching
approach if the `remote_cache_dir` is not configured.

## Documentation
Updated the documentation to include instructions on configuring
`remote_cache_dir`.

## Limitations
1. Users must be on Airflow version 2.8 or higher because the underlying
Airflow Object Store feature we utilise to access remote stores was
introduced in this version. If users attempt to specify a
`remote_cache_dir` on an older Airflow version, they will encounter an
error indicating the version requirement.
2. Users would observe a slight delay for the tasks being in queued
state (approx 1-2 seconds queued duration vs the 0-1 seconds previously
in the Variable approach) due to remote storage calls to retrieve the
cache from.

Closes: #1072
  • Loading branch information
pankajkoti authored Aug 16, 2024
1 parent e1ff924 commit 41053ed
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.5.1"
__version__ = "1.6.0a6"


from cosmos.airflow.dag import DbtDag
Expand Down
103 changes: 103 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session

Expand All @@ -25,22 +26,66 @@
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
PACKAGE_LOCKFILE_YML,
)
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.settings import (
AIRFLOW_IO_AVAILABLE,
cache_dir,
dbt_profile_cache_dir_name,
enable_cache,
enable_cache_package_lockfile,
enable_cache_profile,
remote_cache_dir_conn_id,
)
from cosmos.settings import remote_cache_dir as settings_remote_cache_dir

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"


def _configure_remote_cache_dir() -> Path | None:
"""Configure the remote cache dir if it is provided."""
if not settings_remote_cache_dir:
return None

_configured_cache_dir = None

cache_dir_str = str(settings_remote_cache_dir)

remote_cache_conn_id = remote_cache_dir_conn_id
if not remote_cache_conn_id:
cache_dir_schema = cache_dir_str.split("://")[0]
remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment]
if remote_cache_conn_id is None:
return _configured_cache_dir

if not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote cache_dir {cache_dir_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id)

if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call]
# TODO: Check if we should raise an error instead in case the provided path does not exist.
_configured_cache_dir.mkdir(parents=True, exist_ok=True)

# raise CosmosValueError(
# f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using "
# f"remote_cache_conn_id `{remote_cache_conn_id}`"
# )

return _configured_cache_dir


def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]:
dag_id = None
task_group_id = None
Expand Down Expand Up @@ -366,6 +411,64 @@ def delete_unused_dbt_ls_cache(
return deleted_cosmos_variables


# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids_remote_cache_files = defaultdict(list)

configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
)
return 0

dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()]
files = [f for label in dirs for f in label.iterdir() if f.is_file()]

total_cosmos_remote_cache_files = 0
for file in files:
prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri()
if file.as_uri().startswith(prefix_path):
with file.open("r") as fp:
cache_dict = json.load(fp)
cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file)
total_cosmos_remote_cache_files += 1

deleted_cosmos_remote_cache_files = 0

for dag_id, files in cosmos_dags_ids_remote_cache_files.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
)

return deleted_cosmos_remote_cache_files


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
25 changes: 23 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from cosmos import cache, settings
from cosmos.cache import (
_configure_remote_cache_dir,
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
Expand Down Expand Up @@ -312,7 +313,22 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
"last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(),
**self.airflow_metadata,
}
Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True)
remote_cache_dir = _configure_remote_cache_dir()
if remote_cache_dir:
remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json"
with remote_cache_key_path.open("w") as fp:
json.dump(cache_dict, fp)
else:
Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True)

def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]:
"""Loads the remote cache for dbt ls."""
cache_dict: dict[str, str] = {}
remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json"
if remote_cache_key_path.exists():
with remote_cache_key_path.open("r") as fp:
cache_dict = json.load(fp)
return cache_dict

def get_dbt_ls_cache(self) -> dict[str, str]:
"""
Expand All @@ -327,7 +343,12 @@ def get_dbt_ls_cache(self) -> dict[str, str]:
"""
cache_dict: dict[str, str] = {}
try:
cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True)
remote_cache_dir = _configure_remote_cache_dir()
cache_dict = (
self._get_dbt_ls_remote_cache(remote_cache_dir)
if remote_cache_dir
else Variable.get(self.dbt_ls_cache_key, deserialize_json=True)
)
except (json.decoder.JSONDecodeError, KeyError):
return cache_dict
else:
Expand Down
12 changes: 11 additions & 1 deletion cosmos/settings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import os
import tempfile
from pathlib import Path
Expand All @@ -7,7 +9,10 @@
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE
from cosmos.constants import (
DEFAULT_COSMOS_CACHE_DIR_NAME,
DEFAULT_OPENLINEAGE_NAMESPACE,
)

# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
Expand All @@ -23,6 +28,11 @@
enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True)
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")

# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback.
# This will be merged with the `cache_dir` config parameter in upcoming releases.
remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None)
remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None)

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
Expand Down
11 changes: 10 additions & 1 deletion dev/dags/example_cosmos_cleanup_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airflow.decorators import dag, task

from cosmos.cache import delete_unused_dbt_ls_cache
from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files


@dag(
Expand All @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None):

clear_db_ls_cache()

@task()
def clear_db_ls_remote_cache(session=None):
"""
Delete the dbt ls remote cache files that have not been used for the last five days.
"""
delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5))

clear_db_ls_remote_cache()


# [END cache_example]

Expand Down
17 changes: 14 additions & 3 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari
Disabling individual types of cache in Cosmos is also possible, as explained below.

Caching the dbt ls output
~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~

(Introduced in Cosmos 1.5)

Expand All @@ -29,13 +29,24 @@ also the tasks queueing time.

Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output
of this command as an `Airflow Variable <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html>`_.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.

This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``.

(Introduced in Cosmos 1.6 - Experimental feature)

Starting with Cosmos 1.6.0, users can also set a remote directory path to store this cache instead of using Airflow
Variables. To do so, you need to configure a remote cache directory. See :ref:`remote_cache_dir` and
:ref:`remote_cache_dir_conn_id` for more information. This is an experimental feature introduced in 1.6.0 to gather
user feedback. The ``remote_cache_dir`` will eventually be merged into the :ref:`cache_dir` setting in upcoming
releases.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key.
If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_dir`` introduced
in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path
in the remote store.

Cosmos will refresh the cache in a few circumstances:

Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``profile``
- Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``

.. _remote_cache_dir:

`remote_cache_dir`_:
The remote directory to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache
in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0)
using this configuration. The value for the remote cache directory can be any of the schemes that are supported by
the `Airflow Object Store <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``,
``abfs://your_azure_container/cache_dir``, etc.)

This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the
``cache_dir`` setting in upcoming releases.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR``

.. _remote_cache_dir_conn_id:

`remote_cache_dir_conn_id`_:
The connection ID for the remote cache directory. If this is not set, the default Airflow connection ID identified
for the scheme will be used.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID``

[openlineage]
~~~~~~~~~~~~~

Expand Down
Loading

0 comments on commit 41053ed

Please sign in to comment.