Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DbtDocsGCSOperator #616

Merged
merged 14 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cosmos/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .local import DbtDocsAzureStorageLocalOperator as DbtDocsAzureStorageOperator
from .local import DbtDocsLocalOperator as DbtDocsOperator
from .local import DbtDocsS3LocalOperator as DbtDocsS3Operator
from .local import DbtDocsGCSLocalOperator as DbtDocsGCSOperator
from .local import DbtLSLocalOperator as DbtLSOperator
from .local import DbtRunLocalOperator as DbtRunOperator
from .local import DbtRunOperationLocalOperator as DbtRunOperationOperator
Expand All @@ -20,4 +21,5 @@
"DbtDocsOperator",
"DbtDocsS3Operator",
"DbtDocsAzureStorageOperator",
"DbtDocsGCSOperator",
]
110 changes: 67 additions & 43 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from attr import define
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING
from abc import ABC, abstractmethod

import airflow
import yaml
Expand Down Expand Up @@ -539,48 +540,54 @@ def __init__(self, **kwargs: Any) -> None:
self.base_cmd = ["docs", "generate"]


class DbtDocsS3LocalOperator(DbtDocsLocalOperator):
"""
Executes `dbt docs generate` command and upload to S3 storage. Returns the S3 path to the generated documentation.

:param aws_conn_id: S3's Airflow connection ID
:param bucket_name: S3's bucket name
:param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
uploaded.
"""

ui_color = "#FF9900"

class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC):
tatiana marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
aws_conn_id: str,
connection_id: str,
bucket_name: str,
folder_dir: str | None = None,
**kwargs: str,
) -> None:
"Initializes the operator."
self.aws_conn_id = aws_conn_id
self.connection_id = connection_id
self.bucket_name = bucket_name
self.folder_dir = folder_dir

super().__init__(**kwargs)

# override the callback with our own
self.callback = self.upload_to_s3
self.callback = self.upload_to_cloud_storage

@abstractmethod
def upload_to_cloud_storage(self, project_dir: str) -> None:
pass


class DbtDocsS3LocalOperator(DbtDocsCloudLocalOperator):
"""
Executes `dbt docs generate` command and upload to S3 storage. Returns the S3 path to the generated documentation.

:param connection_id: S3's Airflow connection ID
tatiana marked this conversation as resolved.
Show resolved Hide resolved
:param bucket_name: S3's bucket name
:param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
uploaded.
"""

ui_color = "#FF9900"

def upload_to_s3(self, project_dir: str) -> None:
def upload_to_cloud_storage(self, project_dir: str) -> None:
"Uploads the generated documentation to S3."
logger.info(
'Attempting to upload generated docs to S3 using S3Hook("%s")',
self.aws_conn_id,
self.connection_id,
)

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/target"

hook = S3Hook(
self.aws_conn_id,
self.connection_id,
extra_args={
"ContentType": "text/html",
},
Expand All @@ -599,67 +606,84 @@ def upload_to_s3(self, project_dir: str) -> None:
)


class DbtDocsAzureStorageLocalOperator(DbtDocsLocalOperator):
class DbtDocsAzureStorageLocalOperator(DbtDocsCloudLocalOperator):
"""
Executes `dbt docs generate` command and upload to Azure Blob Storage.

:param azure_conn_id: Azure Blob Storage's Airflow connection ID
:param container_name: Azure Blob Storage's bucket name
:param connection_id: Azure Blob Storage's Airflow connection ID
:param bucket_name: Azure Blob Storage's bucket name
tatiana marked this conversation as resolved.
Show resolved Hide resolved
:param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
uploaded.
"""

ui_color = "#007FFF"

def __init__(
self,
azure_conn_id: str,
container_name: str,
folder_dir: str | None = None,
**kwargs: str,
) -> None:
"Initializes the operator."
self.azure_conn_id = azure_conn_id
self.container_name = container_name
self.folder_dir = folder_dir

super().__init__(**kwargs)

# override the callback with our own
self.callback = self.upload_to_azure

def upload_to_azure(self, project_dir: str) -> None:
def upload_to_cloud_storage(self, project_dir: str) -> None:
"Uploads the generated documentation to Azure Blob Storage."
logger.info(
'Attempting to upload generated docs to Azure Blob Storage using WasbHook(conn_id="%s")',
self.azure_conn_id,
self.connection_id,
)

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/target"

hook = WasbHook(
self.azure_conn_id,
self.connection_id,
)

for filename in self.required_files:
logger.info(
"Uploading %s to %s",
filename,
f"wasb://{self.container_name}/{filename}",
f"wasb://{self.bucket_name}/{filename}",
)

blob_name = f"{self.folder_dir}/{filename}" if self.folder_dir else filename

hook.load_file(
file_path=f"{target_dir}/{filename}",
container_name=self.container_name,
container_name=self.bucket_name,
blob_name=blob_name,
overwrite=True,
)


class DbtDocsGCSLocalOperator(DbtDocsCloudLocalOperator):
"""
Executes `dbt docs generate` command and upload to GCS.

:param connection_id: Google Cloud Storage's Airflow connection ID
:param bucket_name: Google Cloud Storage's bucket name
:param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
uploaded.
"""

ui_color = "#007FFF"

def upload_to_cloud_storage(self, project_dir: str) -> None:
"Uploads the generated documentation to Google Cloud Storage"
logger.info(
'Attempting to upload generated docs to Storage using GCSHook(conn_id="%s")',
self.connection_id,
)

from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/target"
hook = GCSHook(self.connection_id)

for filename in self.required_files:
blob_name = f"{self.folder_dir}/{filename}" if self.folder_dir else filename
logger.info("Uploading %s to %s", filename, f"gs://{self.bucket_name}/{blob_name}")
hook.upload(
filename=f"{target_dir}/{filename}",
bucket_name=self.bucket_name,
object_name=blob_name,
)


class DbtDepsLocalOperator(DbtLocalBaseOperator):
"""
Executes a dbt core deps command.
Expand Down
16 changes: 13 additions & 3 deletions dev/dags/dbt_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from cosmos.operators import (
DbtDocsAzureStorageOperator,
DbtDocsS3Operator,
DbtDocsGCSOperator,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping

Expand All @@ -28,6 +29,7 @@

S3_CONN_ID = "aws_docs"
AZURE_CONN_ID = "azure_docs"
GCS_CONN_ID = "gcs_docs"

profile_config = ProfileConfig(
profile_name="default",
Expand Down Expand Up @@ -72,16 +74,24 @@ def which_upload():
task_id="generate_dbt_docs_aws",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
aws_conn_id=S3_CONN_ID,
connection_id=S3_CONN_ID,
bucket_name="cosmos-docs",
)

generate_dbt_docs_azure = DbtDocsAzureStorageOperator(
task_id="generate_dbt_docs_azure",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
azure_conn_id=AZURE_CONN_ID,
container_name="$web",
connection_id=AZURE_CONN_ID,
bucket_name="$web",
)

generate_dbt_docs_azure = DbtDocsGCSOperator(
task_id="generate_dbt_docs_azure",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
connection_id=GCS_CONN_ID,
bucket_name="cosmos-docs",
)

which_upload() >> [generate_dbt_docs_aws, generate_dbt_docs_azure]
30 changes: 26 additions & 4 deletions docs/configuration/generating-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ Cosmos offers two pre-built ways of generating and uploading dbt docs and a fall

- :class:`~cosmos.operators.DbtDocsS3Operator`: generates and uploads docs to a S3 bucket.
- :class:`~cosmos.operators.DbtDocsAzureStorageOperator`: generates and uploads docs to an Azure Blob Storage.
- :class:`~cosmos.operators.DbtDocsGCSOperator`: generates and uploads docs to a GCS bucket.
- :class:`~cosmos.operators.DbtDocsOperator`: generates docs and runs a custom callback.

The first two operators require you to have a connection to the target storage. The third operator allows you to run custom code after the docs are generated in order to upload them to a storage of your choice.
The first three operators require you to have a connection to the target storage. The last operator allows you to run custom code after the docs are generated in order to upload them to a storage of your choice.


Examples
Expand All @@ -36,7 +37,7 @@ You can use the :class:`~cosmos.operators.DbtDocsS3Operator` to generate and upl
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
# docs-specific arguments
aws_conn_id="test_aws",
connection_id="test_aws",
bucket_name="test_bucket",
)

Expand All @@ -57,8 +58,29 @@ You can use the :class:`~cosmos.operators.DbtDocsAzureStorageOperator` to genera
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
# docs-specific arguments
azure_conn_id="test_azure",
container_name="$web",
connection_id="test_azure",
bucket_name="$web",
)

Upload to GCS
~~~~~~~~~~~~~~~~~~~~~~~

GCS supports serving static files directly from a bucket. To learn more (and to set it up), check out the `official GCS documentation <https://cloud.google.com/appengine/docs/standard/serving-static-files?tab=python>`_.

You can use the :class:`~cosmos.operators.DbtDocsGCSOperator` to generate and upload docs to a S3 bucket. The following code snippet shows how to do this with the default jaffle_shop project:

.. code-block:: python

from cosmos.operators import DbtDocsS3Operator

# then, in your DAG code:
generate_dbt_docs_aws = DbtDocsGCSOperator(
task_id="generate_dbt_docs_gcs",
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
# docs-specific arguments
connection_id="test_gcs",
bucket_name="test_bucket",
)

Custom Callback
Expand Down
8 changes: 6 additions & 2 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DbtDocsLocalOperator,
DbtDocsS3LocalOperator,
DbtDocsAzureStorageLocalOperator,
DbtDocsGCSLocalOperator,
DbtSeedLocalOperator,
DbtRunOperationLocalOperator,
)
Expand Down Expand Up @@ -356,13 +357,16 @@ def test_operator_execute_with_flags(mock_build_and_run_cmd, operator_class, kwa
DbtDocsLocalOperator,
DbtDocsS3LocalOperator,
DbtDocsAzureStorageLocalOperator,
DbtDocsGCSLocalOperator,
),
)
@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd")
def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class):
cloud_docs_kwargs = {"connection_id": "fake-conn", "bucket_name": "fake-bucket"}
operator_class_kwargs = {
DbtDocsS3LocalOperator: {"aws_conn_id": "fake-conn", "bucket_name": "fake-bucket"},
DbtDocsAzureStorageLocalOperator: {"azure_conn_id": "fake-conn", "container_name": "fake-container"},
DbtDocsS3LocalOperator: cloud_docs_kwargs,
DbtDocsAzureStorageLocalOperator: cloud_docs_kwargs,
DbtDocsGCSLocalOperator: cloud_docs_kwargs,
tatiana marked this conversation as resolved.
Show resolved Hide resolved
}
task = operator_class(
profile_config=profile_config,
Expand Down
Loading