Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Add the functionality to parse env parameter to the airflow worker #60

Merged
merged 2 commits into from
May 27, 2022

Conversation

RafalSiwek
Copy link
Contributor

When working with DBT projects, a need arises to be able to pass environment variables to the worker context on which the given operator is executed.

This pull request introduces a solution, similar to the BashOperator, with an additional kwarg parameter, called env which allows parsing env variables to the context using existing operators:

 import os
...

dbt_run = DbtRunOperator(
  task_id='dbt_run',
  env={
    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
    'USER_NAME': '<USER_NAME>',
    'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
    'ENV_NAME': os.getenv('ENV_NAME')
  }
)

@RafalSiwek
Copy link
Contributor Author

This PR should solve the Issue #36

@mariotaddeucci
Copy link

mariotaddeucci commented Apr 16, 2022

Hey @RafalSiwek, I loved that, this will really help me.
I have a suggestion for this feature, can you add the env to the template fields?
By adding template_fields = ("env",) as an static call atribute, this will allow us to access airflow secrets manager by typing the expression:
"DBT_HOST": "{{ conn.my_dbt_secret.host }}".
with this parameter it's possible to solves #39 too

@RafalSiwek
Copy link
Contributor Author

Hey @RafalSiwek, I loved that, this will really help me. I have a suggestion for this feature, can you add the env to the template fields? By adding template_fields = ("env",) as an static call atribute, this will allow us to access airflow secrets manager by typing the expression: "DBT_HOST": "{{ conn.my_dbt_secret.host }}". with this parameter it's possible to solves #39 too

Hey @mariotaddeucci, thank you for the suggestion, I've added the env parameter to the template fields list - feel free to check the last commit.

@dinigo
Copy link

dinigo commented May 24, 2022

@RafalSiwek , nice indeed

Copy link

@dinigo dinigo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Includes the feature itself, testing and documentation (both in the README and in the python code). This LGTM 👍

Copy link
Contributor

@andrewrjones andrewrjones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, this looks great! Thanks also for adding docs.

I'll create a new release

@andrewrjones andrewrjones merged commit 17339d3 into gocardless:master May 27, 2022
@cvlendistry
Copy link

@andrewrjones is there an ETA for the release with this PR included?

@dinigo
Copy link

dinigo commented Aug 16, 2022

@cvlendistry , it should be supported now since it was merged with the main branch

@AndyBys
Copy link

AndyBys commented Sep 2, 2022

Hello,

Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs.

airflow-dbt==0.4.0
apache-airflow==2.2.0

Have someone encountered same behaviour once?

Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it.

Thank you in advance!

@dinigo
Copy link

dinigo commented Sep 5, 2022

@AndyBys , it hasn't been released yet. I just did a PR to try to work with gocardless in a more agile release system #67

@lrahal
Copy link

lrahal commented Sep 23, 2022

Hi! Do you have an ETA for when the env parameter will be released? It prevents me from using the package for now....

@emily-flambe
Copy link

Hello,

Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs.

airflow-dbt==0.4.0 apache-airflow==2.2.0

Have someone encountered same behaviour once?

Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it.

Thank you in advance!

I am running into this exact same issue right now - is there a known workaround?

@RafalSiwek
Copy link
Contributor Author

Hello,
Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs.
airflow-dbt==0.4.0 apache-airflow==2.2.0
Have someone encountered same behaviour once?
Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it.
Thank you in advance!

I am running into this exact same issue right now - is there a known workaround?

Hi, this issue appears to be still related to not updated PyPi airflow-dbt package.
As a workaround, you can always inject the plugin code manually into your airflow instance.

@emily-flambe
Copy link

you can always inject the plugin code manually into your airflow instance

Would be curious to see if anyone has an example of this - I can save the files and import them as local dependencies, but then I get this error:

[2022-10-13, 17:21:34 UTC] {taskinstance.py:1911} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/operators/dbt_operator.py", line 154, in execute
    self.create_hook().run_cli('deps')
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/hooks/dbt_hook.py", line 130, in run_cli
    sp = subprocess.Popen(
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'dbt'```

@qgaborit-ledger
Copy link

Latest airflow-dbt official release dates back to September 2021 so indeed the env parameter proposed in this PR is not included yet. Anyone have a workaround that still relies on dbtrunoperator and env vars? Or an ETA for this PR?

@kubaracek
Copy link

@andrewrjones Can you release this to pip please?

@RafalSiwek
Copy link
Contributor Author

RafalSiwek commented Mar 10, 2023

you can always inject the plugin code manually into your airflow instance

Would be curious to see if anyone has an example of this - I can save the files and import them as local dependencies, but then I get this error:

[2022-10-13, 17:21:34 UTC] {taskinstance.py:1911} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/operators/dbt_operator.py", line 154, in execute
    self.create_hook().run_cli('deps')
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/hooks/dbt_hook.py", line 130, in run_cli
    sp = subprocess.Popen(
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'dbt'```

To inject the custom operator into the Airflow setup I used the tips on https://docs.astronomer.io/learn/airflow-importing-custom-hooks-operators
For AWSs MWAA it required importing the files into the S3 bucket, MWAA is linked to:
https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-dag-folder.html

The FileNotFoundError: [Errno 2] No such file or directory: 'dbt' issue appears to be related to not specifying the full executable path in the injected command, as it is recommended in the subprocess.Popen documentation
In our MWAA implementation, we specified the dbt_bin parameter as an absolute path to the dbt executable:

dbt_run = DbtRunOperator(
            task_id='dbt_run',
            env=env_dict,
            dbt_bin='/usr/local/airflow/.local/bin/dbt',
            profiles_dir=RESOURCES_PATH,
            dir=RESOURCES_PATH
        )

@alexanderspen
Copy link

Hi - Also running into this issue

#60 (comment)

Any idea when this is going to be released and the airflow-dot package updated for pip ? Seems like this would solve a bunch of issues for lot of people

Cheers

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.