Skip to content

Commit

Permalink
UPDATE: attribute handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaspeters committed Nov 2, 2023
1 parent 92c54f6 commit 883d7b6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Changelog

## master

- Update: Mesos Attribute handling.
- Update: Airflow configuration parameter to the new version.

## v0.2.2

- Fix: wrong task id output in airflow logs.
Expand Down
8 changes: 4 additions & 4 deletions avmesos_airflow_provider/executors/mesos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(
self.mesos_fetch_uri = conf.get("mesos", "MESOS_FETCH_URI", fallback="")
self.mesos_fetch_uri_username = conf.get("mesos", "MESOS_FETCH_URI_USERNAME", fallback="root")
self.mesos_attributes = conf.getboolean("mesos", "ATTRIBUTES", fallback=False)
self.core_sql_alchemy_conn = conf.get("core", "SQL_ALCHEMY_CONN")
self.database_sql_alchemy_conn = conf.get("database", "SQL_ALCHEMY_CONN")
self.core_fernet_key = conf.get("core", "FERNET_KEY")
self.logging_logging_level = conf.get("logging", "LOGGING_LEVEL")
self.command_shell = str(
Expand Down Expand Up @@ -224,7 +224,7 @@ def run_job(self, mesos_offer):
if attribute["name"] == "airflow":
attribute_airflow = attribute["text"]["value"]

if attribute_airflow == "false":
if attribute_airflow.lower() == "false":
self.log.info("Offered node is not valid for airflow jobs. %s (%s)", airflow_task_id, offer["id"]["value"])
self.task_queue.put((key, cmd, executor_config))
return False
Expand Down Expand Up @@ -279,8 +279,8 @@ def run_job(self, mesos_offer):
"environment": {
"variables": [
{
"name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
"value": self.core_sql_alchemy_conn,
"name": "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN",
"value": self.database_sql_alchemy_conn,
},
{
"name": "AIRFLOW__CORE__FERNET_KEY",
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_adoption_timeout = 600
task_publish_max_retries = 3
worker_precheck = False

Expand Down Expand Up @@ -231,6 +230,7 @@ use_row_level_locking = True
parsing_processes = 2
use_job_schedule = True
allow_trigger_in_future = False
task_adoption_timeout = 600

[kerberos]
ccache = /tmp/airflow_krb5_ccache
Expand Down
2 changes: 1 addition & 1 deletion shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ shellHook = ''
cp docs/examples/airflow.cfg /home/$USER/airflow/
cp docs/examples/dags/* /tmp/dags/
cp docs/nixshell/lighttpd.conf /tmp/
airflow db init
airflow db migrate
airflow users create --username admin --role Admin -e [email protected] -f admin -l admin --password admin
# Webserver listen on 8881
Expand Down

0 comments on commit 883d7b6

Please sign in to comment.