Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Add missing dbt commands #68

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions airflow_dbt/operators/dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,82 @@ def execute(self, context):
self.create_hook().run_cli('deps')


class DbtBuildOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtBuildOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('build')


class DbtCleanOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtCleanOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('clean')


class DbtCompileOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtCompileOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('compile')


class DbtDebugOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtDebugOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('debug')


class DbtInitOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtInitOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('init')


class DbtListOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtListOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('list')


class DbtParseOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtParseOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('parse')


class DbtSourceOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtSourceOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('source')


class DbtRunOperationOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtRunOperationOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flake8 complains about too long line here


def execute(self, context):
self.create_hook().run_cli('run-operation')
124 changes: 61 additions & 63 deletions tests/operators/test_dbt_operator.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,74 @@
import datetime
from unittest import TestCase, mock
from unittest import mock
from unittest.mock import MagicMock

import pytest
from airflow import DAG, configuration
from airflow_dbt.hooks.dbt_hook import DbtCliHook

from airflow_dbt import DbtCliHook
from airflow_dbt.operators.dbt_operator import (
DbtBaseOperator,
DbtBuildOperator,
DbtCleanOperator,
DbtCompileOperator,
DbtDebugOperator,
DbtDepsOperator,
DbtDocsGenerateOperator,
DbtInitOperator,
DbtListOperator,
DbtParseOperator,
DbtRunOperator,
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtSourceOperator,
DbtTestOperator,
DbtDepsOperator,
DbtCleanOperator,
)


class TestDbtOperator(TestCase):
def setUp(self):
configuration.conf.load_test_config()
args = {
'owner': 'airflow',
'start_date': datetime.datetime(2020, 2, 27)
}
self.dag = DAG('test_dag_id', default_args=args)

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_run(self, mock_run_cli):
operator = DbtRunOperator(
task_id='run',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('run')
@pytest.fixture
def spy_cli_run(mocker) -> MagicMock:
yield mocker.patch("airflow_dbt.hooks.dbt_hook.DbtCliHook.run_cli")

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_test(self, mock_run_cli):
operator = DbtTestOperator(
task_id='test',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('test')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_snapshot(self, mock_run_cli):
operator = DbtSnapshotOperator(
task_id='snapshot',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('snapshot')
@pytest.fixture
def mock_dag() -> MagicMock:
configuration.conf.load_test_config()
args = {
'owner': 'airflow',
'start_date': datetime.datetime(2020, 2, 27)
}
yield DAG('test_dag_id', default_args=args)

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_seed(self, mock_run_cli):
operator = DbtSeedOperator(
task_id='seed',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('seed')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_deps(self, mock_run_cli):
operator = DbtDepsOperator(
task_id='deps',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('deps')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_clean(self, mock_run_cli):
operator = DbtCleanOperator(
task_id='clean',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('clean')
@pytest.mark.parametrize(
['operator', 'expected_command'],
[
(DbtRunOperator, ['run']),
(DbtTestOperator, ['test']),
(DbtSnapshotOperator, ['snapshot']),
(DbtDocsGenerateOperator, ['docs', 'generate']),
(DbtSeedOperator, ['seed']),
(DbtDepsOperator, ['deps']),
(DbtBuildOperator, ['build']),
(DbtCleanOperator, ['clean']),
(DbtCompileOperator, ['compile']),
(DbtDebugOperator, ['debug']),
(DbtInitOperator, ['init']),
(DbtListOperator, ['list']),
(DbtParseOperator, ['parse']),
(DbtListOperator, ['list']),
(DbtSourceOperator, ['source']),
]
)
@mock.patch.object(DbtCliHook, 'run_cli')
def test_operators_commands(
spy_cli_run,
operator: DbtBaseOperator,
expected_command: [str],
mock_dag,
):
"""Every operator passess down to the execution the correct dbt command"""
task_id = 'test_dbt_' + '_'.join(expected_command)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flake8 complains about an extra space here

operator = operator(task_id=task_id, dag=mock_dag)
operator.execute(None)
spy_cli_run.assert_called_once_with(*expected_command)