diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 5f8d632..bb53f7a 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -154,6 +154,15 @@ def execute(self, context): self.create_hook().run_cli('deps') +class DbtBuildOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtBuildOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('build') + + class DbtCleanOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): @@ -161,3 +170,66 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): def execute(self, context): self.create_hook().run_cli('clean') + + +class DbtCompileOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtCompileOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('compile') + + +class DbtDebugOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDebugOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('debug') + + +class DbtInitOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtInitOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('init') + + +class DbtListOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtListOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('list') + + +class DbtParseOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtParseOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('parse') + + +class DbtSourceOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtSourceOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('source') + + +class DbtRunOperationOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtRunOperationOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('run-operation') diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 78604d1..3c6b55a 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -1,76 +1,74 @@ import datetime -from unittest import TestCase, mock +from unittest import mock +from unittest.mock import MagicMock + +import pytest from airflow import DAG, configuration -from airflow_dbt.hooks.dbt_hook import DbtCliHook + +from airflow_dbt import DbtCliHook from airflow_dbt.operators.dbt_operator import ( + DbtBaseOperator, + DbtBuildOperator, + DbtCleanOperator, + DbtCompileOperator, + DbtDebugOperator, + DbtDepsOperator, + DbtDocsGenerateOperator, + DbtInitOperator, + DbtListOperator, + DbtParseOperator, + DbtRunOperator, DbtSeedOperator, DbtSnapshotOperator, - DbtRunOperator, + DbtSourceOperator, DbtTestOperator, - DbtDepsOperator, - DbtCleanOperator, ) -class TestDbtOperator(TestCase): - def setUp(self): - configuration.conf.load_test_config() - args = { - 'owner': 'airflow', - 'start_date': datetime.datetime(2020, 2, 27) - } - self.dag = DAG('test_dag_id', default_args=args) - - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_run(self, mock_run_cli): - operator = DbtRunOperator( - task_id='run', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('run') +@pytest.fixture +def spy_cli_run(mocker) -> MagicMock: + yield mocker.patch("airflow_dbt.hooks.dbt_hook.DbtCliHook.run_cli") - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_test(self, mock_run_cli): - operator = DbtTestOperator( - task_id='test', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('test') - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_snapshot(self, mock_run_cli): - operator = DbtSnapshotOperator( - task_id='snapshot', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('snapshot') +@pytest.fixture +def mock_dag() -> MagicMock: + configuration.conf.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2020, 2, 27) + } + yield DAG('test_dag_id', default_args=args) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_seed(self, mock_run_cli): - operator = DbtSeedOperator( - task_id='seed', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('seed') - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_deps(self, mock_run_cli): - operator = DbtDepsOperator( - task_id='deps', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('deps') - - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_clean(self, mock_run_cli): - operator = DbtCleanOperator( - task_id='clean', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('clean') +@pytest.mark.parametrize( + ['operator', 'expected_command'], + [ + (DbtRunOperator, ['run']), + (DbtTestOperator, ['test']), + (DbtSnapshotOperator, ['snapshot']), + (DbtDocsGenerateOperator, ['docs', 'generate']), + (DbtSeedOperator, ['seed']), + (DbtDepsOperator, ['deps']), + (DbtBuildOperator, ['build']), + (DbtCleanOperator, ['clean']), + (DbtCompileOperator, ['compile']), + (DbtDebugOperator, ['debug']), + (DbtInitOperator, ['init']), + (DbtListOperator, ['list']), + (DbtParseOperator, ['parse']), + (DbtListOperator, ['list']), + (DbtSourceOperator, ['source']), + ] +) +@mock.patch.object(DbtCliHook, 'run_cli') +def test_operators_commands( + spy_cli_run, + operator: DbtBaseOperator, + expected_command: [str], + mock_dag, +): + """Every operator passess down to the execution the correct dbt command""" + task_id = 'test_dbt_' + '_'.join(expected_command) + operator = operator(task_id=task_id, dag=mock_dag) + operator.execute(None) + spy_cli_run.assert_called_once_with(*expected_command)