diff --git a/CHANGES.txt b/CHANGES.txt index 1c01309..ec301b5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,10 @@ ## Changelog +## master + +- Update: Mesos Attribute handling. +- Update: Airflow configuration parameter to the new version. + ## v0.2.2 - Fix: wrong task id output in airflow logs. diff --git a/avmesos_airflow_provider/executors/mesos_executor.py b/avmesos_airflow_provider/executors/mesos_executor.py index a4b865c..e08f92d 100644 --- a/avmesos_airflow_provider/executors/mesos_executor.py +++ b/avmesos_airflow_provider/executors/mesos_executor.py @@ -106,7 +106,7 @@ def __init__( self.mesos_fetch_uri = conf.get("mesos", "MESOS_FETCH_URI", fallback="") self.mesos_fetch_uri_username = conf.get("mesos", "MESOS_FETCH_URI_USERNAME", fallback="root") self.mesos_attributes = conf.getboolean("mesos", "ATTRIBUTES", fallback=False) - self.core_sql_alchemy_conn = conf.get("core", "SQL_ALCHEMY_CONN") + self.database_sql_alchemy_conn = conf.get("database", "SQL_ALCHEMY_CONN") self.core_fernet_key = conf.get("core", "FERNET_KEY") self.logging_logging_level = conf.get("logging", "LOGGING_LEVEL") self.command_shell = str( @@ -224,7 +224,7 @@ def run_job(self, mesos_offer): if attribute["name"] == "airflow": attribute_airflow = attribute["text"]["value"] - if attribute_airflow == "false": + if attribute_airflow.lower() == "false": self.log.info("Offered node is not valid for airflow jobs. %s (%s)", airflow_task_id, offer["id"]["value"]) self.task_queue.put((key, cmd, executor_config)) return False @@ -279,8 +279,8 @@ def run_job(self, mesos_offer): "environment": { "variables": [ { - "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN", - "value": self.core_sql_alchemy_conn, + "name": "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", + "value": self.database_sql_alchemy_conn, }, { "name": "AIRFLOW__CORE__FERNET_KEY", diff --git a/docs/examples/airflow.cfg b/docs/examples/airflow.cfg index 3835d79..a9a6856 100644 --- a/docs/examples/airflow.cfg +++ b/docs/examples/airflow.cfg @@ -199,7 +199,6 @@ ssl_cacert = pool = prefork operation_timeout = 1.0 task_track_started = True -task_adoption_timeout = 600 task_publish_max_retries = 3 worker_precheck = False @@ -231,6 +230,7 @@ use_row_level_locking = True parsing_processes = 2 use_job_schedule = True allow_trigger_in_future = False +task_adoption_timeout = 600 [kerberos] ccache = /tmp/airflow_krb5_ccache diff --git a/shell.nix b/shell.nix index e3215d9..3e3e720 100644 --- a/shell.nix +++ b/shell.nix @@ -38,7 +38,7 @@ shellHook = '' cp docs/examples/airflow.cfg /home/$USER/airflow/ cp docs/examples/dags/* /tmp/dags/ cp docs/nixshell/lighttpd.conf /tmp/ - airflow db init + airflow db migrate airflow users create --username admin --role Admin -e test@example.com -f admin -l admin --password admin # Webserver listen on 8881