diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 2d0198632f3bf..d24fd4f151691 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -25,9 +25,11 @@ from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple import pendulum +from deprecated import deprecated from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf +from airflow.exceptions import RemovedInAirflow3Warning from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log from airflow.stats import Stats @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError - def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover + @deprecated( + reason="Replaced by function `revoke_task`.", + category=RemovedInAirflow3Warning, + action="ignore", + ) + def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ Handle remnants of tasks that were failed because they were stuck in queued. @@ -563,7 +570,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p :param tis: List of Task Instances to clean up :return: List of readable task instances for a warning message """ - raise NotImplementedError() + raise NotImplementedError + + def revoke_task(self, *, ti: TaskInstance): + """ + Attempt to remove task from executor. + + It should attempt to ensure that the task is no longer running on the worker, + and ensure that it is cleared out from internal data structures. + + It should *not* change the state of the task in airflow, or add any events + to the event buffer. + + It should not raise any error. + + :param ti: Task instance to remove + """ + raise NotImplementedError def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: """ diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 6c7887c643cb5..de42ee01afdac 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -25,12 +25,14 @@ import sys import time from collections import Counter, defaultdict, deque +from contextlib import suppress from datetime import timedelta from functools import lru_cache, partial from itertools import groupby from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator +from deprecated import deprecated from sqlalchemy import and_, delete, exists, func, not_, select, text, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload @@ -40,7 +42,7 @@ from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.configuration import conf -from airflow.exceptions import UnknownExecutorException +from airflow.exceptions import RemovedInAirflow3Warning, UnknownExecutorException from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import Job, perform_heartbeat @@ -99,6 +101,9 @@ DR = DagRun DM = DagModel +TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule" +""":meta private:""" + class ConcurrencyMap: """ @@ -184,8 +189,15 @@ def __init__( self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration") self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") - self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc") + + # this param is intentionally undocumented + self._num_stuck_queued_retries = conf.getint( + section="scheduler", + key="num_stuck_in_queued_retries", + fallback=2, + ) + if self._enable_tracemalloc: import tracemalloc @@ -1046,7 +1058,7 @@ def _run_scheduler_loop(self) -> None: timers.call_regular_interval( conf.getfloat("scheduler", "task_queued_timeout_check_interval"), - self._fail_tasks_stuck_in_queued, + self._handle_tasks_stuck_in_queued, ) timers.call_regular_interval( @@ -1098,6 +1110,7 @@ def _run_scheduler_loop(self) -> None: for executor in self.job.executors: try: # this is backcompat check if executor does not inherit from BaseExecutor + # todo: remove in airflow 3.0 if not hasattr(executor, "_task_event_logs"): continue with create_session() as session: @@ -1767,48 +1780,132 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session - def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: + def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ - Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks - should not be stuck in queued for a long time. This will mark tasks stuck in - queued for longer than `self._task_queued_timeout` as failed. If the task has - available retries, it will be retried. + should not be stuck in queued for a long time. + + We will attempt to requeue the task (by revoking it from executor and setting to + scheduled) up to 2 times before failing the task. """ - self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method") + tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session) + for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): + try: + for ti in stuck_tis: + executor.revoke_task(ti=ti) + self._maybe_requeue_stuck_ti( + ti=ti, + session=session, + ) + except NotImplementedError: + # this block only gets entered if the executor has not implemented `revoke_task`. + # in which case, we try the fallback logic + # todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0. + # after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should + # just continue immediately. + self._stuck_in_queued_backcompat_logic(executor, stuck_tis) + continue - tasks_stuck_in_queued = session.scalars( + def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: + """Query db for TIs that are stuck in queued.""" + return session.scalars( select(TI).where( TI.state == TaskInstanceState.QUEUED, TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)), TI.queued_by_job_id == self.job.id, ) - ).all() + ) - for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): - try: - cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) - for ti in stuck_tis: - if repr(ti) in cleaned_up_task_instances: - self.log.warning( - "Marking task instance %s stuck in queued as failed. " - "If the task instance has available retries, it will be retried.", - ti, - ) - session.add( - Log( - event="stuck in queued", - task_instance=ti.key, - extra=( - "Task will be marked as failed. If the task instance has " - "available retries, it will be retried." - ), - ) - ) - except NotImplementedError: - self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") + def _maybe_requeue_stuck_ti(self, *, ti, session): + """ + Requeue task if it has not been attempted too many times. + + Otherwise, fail it. + """ + num_times_stuck = self._get_num_times_stuck_in_queued(ti, session) + if num_times_stuck < self._num_stuck_queued_retries: + self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id) + session.add( + Log( + event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT, + task_instance=ti.key, + extra=( + f"Task was in queued state for longer than {self._task_queued_timeout} " + "seconds; task state will be set back to scheduled." + ), + ) + ) + self._reschedule_stuck_task(ti) + else: + self.log.info( + "Task requeue attempts exceeded max; marking failed. task_instance=%s", + ti, + ) + session.add( + Log( + event="stuck in queued tries exceeded", + task_instance=ti.key, + extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + ) + ) + ti.set_state(TaskInstanceState.FAILED, session=session) + + @deprecated( + reason="This is backcompat layer for older executor interface. Should be removed in 3.0", + category=RemovedInAirflow3Warning, + action="ignore", + ) + def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis): + """ + Try to invoke stuck in queued cleanup for older executor interface. + + TODO: remove in airflow 3.0 + + Here we handle case where the executor pre-dates the interface change that + introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`. + + """ + with suppress(NotImplementedError): + for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis): + self.log.warning( + "Task instance %s stuck in queued. Will be set to failed.", + ti_repr, + ) + + @provide_session + def _reschedule_stuck_task(self, ti, session=NEW_SESSION): + session.execute( + update(TI) + .where(TI.filter_for_tis([ti])) + .values( + state=TaskInstanceState.SCHEDULED, + queued_dttm=None, + ) + .execution_options(synchronize_session=False) + ) + + @provide_session + def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: + """ + Check the Log table to see how many times a taskinstance has been stuck in queued. + + We can then use this information to determine whether to reschedule a task or fail it. + """ + return ( + session.query(Log) + .where( + Log.task_id == ti.task_id, + Log.dag_id == ti.dag_id, + Log.run_id == ti.run_id, + Log.map_index == ti.map_index, + Log.try_number == ti.try_number, + Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT, + ) + .count() + ) @provide_session def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None: @@ -2167,7 +2264,7 @@ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]: session.add(warning) existing_warned_dag_ids.add(warning.dag_id) - def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]: + def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]: """Organize TIs into lists per their respective executor.""" _executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list) for ti in tis: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cc927fa15a3d3..32787428cc5ce 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1375,6 +1375,7 @@ repos repr req reqs +requeued Reserialize reserialize reserialized diff --git a/providers/src/airflow/providers/celery/executors/celery_executor.py b/providers/src/airflow/providers/celery/executors/celery_executor.py index 807c77ab98782..43ae2cc213399 100644 --- a/providers/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_executor.py @@ -35,6 +35,7 @@ from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple from celery import states as celery_states +from deprecated import deprecated from packaging.version import Version from airflow import __version__ as airflow_version @@ -52,7 +53,7 @@ lazy_load_command, ) from airflow.configuration import conf -from airflow.exceptions import AirflowTaskTimeout +from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor from airflow.stats import Stats from airflow.utils.state import TaskInstanceState @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ - Handle remnants of tasks that were failed because they were stuck in queued. + Remove tasks stuck in queued from executor and fail them. - Tasks can get stuck in queued. If such a task is detected, it will be marked - as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED` - if it doesn't. - - :param tis: List of Task Instances to clean up - :return: List of readable task instances for a warning message + This method is deprecated. Use `cleanup_tasks_stuck_in_queued` instead. """ - readable_tis = [] + reprs = [] + for ti in tis: + reprs.append(repr(ti)) + self.revoke_task(ti=ti) + self.fail(ti.key) + return reprs + + def revoke_task(self, *, ti: TaskInstance): from airflow.providers.celery.executors.celery_executor_utils import app - for ti in tis: - readable_tis.append(repr(ti)) - task_instance_key = ti.key - self.fail(task_instance_key, None) - celery_async_result = self.tasks.pop(task_instance_key, None) - if celery_async_result: - try: - app.control.revoke(celery_async_result.task_id) - except Exception as ex: - self.log.error("Error revoking task instance %s from celery: %s", task_instance_key, ex) - return readable_tis + celery_async_result = self.tasks.pop(ti.key, None) + if celery_async_result: + try: + app.control.revoke(celery_async_result.task_id) + except Exception: + self.log.exception("Error revoking task instance %s from celery", ti.key) + self.running.discard(ti.key) + self.queued_tasks.pop(ti.key, None) @staticmethod def get_cli_commands() -> list[GroupCommand]: diff --git a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index a8c69871ab9c3..3715a37d3d86d 100644 --- a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -20,15 +20,16 @@ from functools import cached_property from typing import TYPE_CHECKING, Sequence +from deprecated import deprecated + from airflow.configuration import conf +from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.executors.celery_executor import CeleryExecutor try: from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor except ImportError as e: - from airflow.exceptions import AirflowOptionalProviderFeatureException - raise AirflowOptionalProviderFeatureException(e) from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -246,6 +247,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task *self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis), ] + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + action="ignore", # ignoring since will get warning from the nested executors + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: celery_tis = [ti for ti in tis if ti.queue != self.kubernetes_queue] kubernetes_tis = [ti for ti in tis if ti.queue == self.kubernetes_queue] @@ -254,6 +260,25 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: *self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis), ] + def revoke_task(self, *, ti: TaskInstance): + if ti.queue == self.kubernetes_queue: + try: + self.kubernetes_executor.revoke_task(ti=ti) + except NotImplementedError: + self.log.warning( + "Your kubernetes provider version is old. Falling back to deprecated " + "function, `cleanup_stuck_queued_tasks`. You must upgrade k8s " + "provider to enable 'stuck in queue' retries and stuck in queue " + "event logging." + ) + for ti_repr in self.kubernetes_executor.cleanup_stuck_queued_tasks(tis=[ti]): + self.log.info( + "task stuck in queued and will be marked failed. task_instance=%s", + ti_repr, + ) + else: + self.celery_executor.revoke_task(ti=ti) + def end(self) -> None: """End celery and kubernetes executor.""" self.celery_executor.end() diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 3300f3cf96d4a..c465548fddf67 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -35,6 +35,7 @@ from queue import Empty, Queue from typing import TYPE_CHECKING, Any, Sequence +from deprecated import deprecated from kubernetes.dynamic import DynamicClient from sqlalchemy import or_, select, update @@ -56,6 +57,7 @@ positive_int, ) from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import KUBERNETES_EXECUTOR from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( @@ -610,6 +612,10 @@ def _iter_tis_to_flush(): tis_to_flush.extend(_iter_tis_to_flush()) return tis_to_flush + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ Handle remnants of tasks that were failed because they were stuck in queued. @@ -621,28 +627,39 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: :param tis: List of Task Instances to clean up :return: List of readable task instances for a warning message """ + reprs = [] + for ti in tis: + reprs.append(repr(ti)) + self.revoke_task(ti=ti) + self.fail(ti.key) + return reprs + + def revoke_task(self, *, ti: TaskInstance): + """ + Revoke task that may be running. + + :param ti: task instance to revoke + """ if TYPE_CHECKING: assert self.kube_client assert self.kube_scheduler - readable_tis: list[str] = [] - if not tis: - return readable_tis + self.running.discard(ti.key) + self.queued_tasks.pop(ti.key, None) pod_combined_search_str_to_pod_map = self.get_pod_combined_search_str_to_pod_map() - for ti in tis: - # Build the pod selector - base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}" - if ti.map_index >= 0: - # Old tasks _couldn't_ be mapped, so we don't have to worry about compat - base_label_selector += f",map_index={ti.map_index}" - - search_str = f"{base_label_selector},run_id={ti.run_id}" - pod = pod_combined_search_str_to_pod_map.get(search_str, None) - if not pod: - self.log.warning("Cannot find pod for ti %s", ti) - continue - readable_tis.append(repr(ti)) - self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) - return readable_tis + # Build the pod selector + base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}" + if ti.map_index >= 0: + # Old tasks _couldn't_ be mapped, so we don't have to worry about compat + base_label_selector += f",map_index={ti.map_index}" + + search_str = f"{base_label_selector},run_id={ti.run_id}" + pod = pod_combined_search_str_to_pod_map.get(search_str, None) + if not pod: + self.log.warning("Cannot find pod for ti %s", ti) + return + + self.kube_scheduler.patch_pod_revoked(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) + self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace) def adopt_launched_task( self, diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 422913629802c..1e29861c6c456 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -35,3 +35,13 @@ ALL_NAMESPACES = "ALL_NAMESPACES" POD_EXECUTOR_DONE_KEY = "airflow_executor_done" + +POD_REVOKED_KEY = "airflow_pod_revoked" +"""Label to indicate pod revoked by executor. + +When executor the executor revokes a task, the pod deletion is the result of +the revocation. So we don't want it to process that as an external deletion. +So we want events on a revoked pod to be ignored. + +:meta private: +""" diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index ace65466b23f4..ad0207cdfa3b3 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -33,6 +33,7 @@ ADOPTED, ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY, + POD_REVOKED_KEY, ) from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( @@ -205,9 +206,13 @@ def process_status( resource_version: str, event: Any, ) -> None: + """Process status response.""" pod = event["object"] + + if POD_REVOKED_KEY in pod.metadata.labels.keys(): + return + annotations_string = annotations_for_logging_task_metadata(annotations) - """Process status response.""" if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: # This will happen only when the task pods are adopted by another executor. # So, there is no change in the pod state. @@ -433,7 +438,7 @@ def run_next(self, next_job: KubernetesJobType) -> None: def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" try: - self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace) + self.log.info("Deleting pod %s in namespace %s", pod_name, namespace) self.kube_client.delete_namespaced_pod( pod_name, namespace, @@ -445,6 +450,26 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: if str(e.status) != "404": raise + def patch_pod_revoked(self, *, pod_name: str, namespace: str): + """ + Patch the pod with a label that ensures it's ignored by the kubernetes watcher. + + :meta private: + """ + self.log.info( + "Patching pod %s in namespace %s to note that we are revoking the task.", + pod_name, + namespace, + ) + try: + self.kube_client.patch_namespaced_pod( + name=pod_name, + namespace=namespace, + body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}}, + ) + except ApiException: + self.log.warning("Failed to patch pod %s with pod revoked key.", pod_name, exc_info=True) + def patch_pod_executor_done(self, *, pod_name: str, namespace: str): """Add a "done" annotation to ensure we don't continually adopt pods.""" self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace) diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index d24a59a95d102..fa1e584971ef5 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Sequence +from deprecated import deprecated + from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor @@ -230,12 +233,21 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task *self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis), ] + @deprecated( + reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", + category=AirflowProviderDeprecationWarning, + action="ignore", # ignoring since will get warning from the nested executors + ) def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # LocalExecutor doesn't have a cleanup_stuck_queued_tasks method, so we # will only run KubernetesExecutor's kubernetes_tis = [ti for ti in tis if ti.queue == self.KUBERNETES_QUEUE] return self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis) + def revoke_task(self, *, ti: TaskInstance): + if ti.queue == self.KUBERNETES_QUEUE: + self.kubernetes_executor.revoke_task(ti=ti) + def end(self) -> None: """End local and kubernetes executor.""" self.local_executor.end() diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 2fa72deab0aab..aa8dddcb363d6 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -255,11 +255,43 @@ def test_cleanup_stuck_queued_tasks(self, mock_fail): executor.job_id = 1 executor.running = {ti.key} executor.tasks = {ti.key: AsyncResult("231")} - executor.cleanup_stuck_queued_tasks(tis) + assert executor.has_task(ti) + with pytest.warns(DeprecationWarning): + executor.cleanup_stuck_queued_tasks(tis=tis) executor.sync() assert executor.tasks == {} app.control.revoke.assert_called_once_with("231") - mock_fail.assert_called_once() + mock_fail.assert_called() + assert not executor.has_task(ti) + + @pytest.mark.backend("mysql", "postgres") + @mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.fail") + def test_revoke_task(self, mock_fail): + start_date = timezone.utcnow() - timedelta(days=2) + + with DAG("test_revoke_task", schedule=None): + task = BaseOperator(task_id="task_1", start_date=start_date) + + ti = TaskInstance(task=task, run_id=None) + ti.external_executor_id = "231" + ti.state = State.QUEUED + ti.queued_dttm = timezone.utcnow() - timedelta(minutes=30) + ti.queued_by_job_id = 1 + tis = [ti] + with _prepare_app() as app: + app.control.revoke = mock.MagicMock() + executor = celery_executor.CeleryExecutor() + executor.job_id = 1 + executor.running = {ti.key} + executor.tasks = {ti.key: AsyncResult("231")} + assert executor.has_task(ti) + for ti in tis: + executor.revoke_task(ti=ti) + executor.sync() + app.control.revoke.assert_called_once_with("231") + assert executor.tasks == {} + assert not executor.has_task(ti) + mock_fail.assert_not_called() @conf_vars({("celery", "result_backend_sqlalchemy_engine_options"): '{"pool_recycle": 1800}'}) @mock.patch("celery.Celery") diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 14785fab37b9e..e5cc8619a97b8 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1220,7 +1220,14 @@ def test_not_adopt_unassigned_task(self, mock_kube_client): @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session): - """Delete any pods associated with a task stuck in queued.""" + """ + This verifies legacy behavior. Remove when removing ``cleanup_stuck_queued_tasks``. + + It's expected that that method, ``cleanup_stuck_queued_tasks`` will patch the pod + such that it is ignored by watcher, delete the pod, remove from running set, and + fail the task. + + """ mock_kube_client = mock.MagicMock() mock_kube_dynamic_client.return_value = mock.MagicMock() mock_pod_resource = mock.MagicMock() @@ -1261,8 +1268,64 @@ def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client, dag_maker, c executor.kube_scheduler = mock.MagicMock() ti.refresh_from_db() tis = [ti] - executor.cleanup_stuck_queued_tasks(tis) + with pytest.warns(DeprecationWarning): + executor.cleanup_stuck_queued_tasks(tis=tis) + executor.kube_scheduler.delete_pod.assert_called_once() + assert executor.running == set() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") + def test_revoke_task(self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session): + """ + It's expected that that ``revoke_tasks`` will patch the pod + such that it is ignored by watcher, delete the pod and remove from running set. + """ + mock_kube_client = mock.MagicMock() + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList( + items=[ + k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + annotations={ + "dag_id": "test_cleanup_stuck_queued_tasks", + "task_id": "bash", + "run_id": "test", + "try_number": 0, + }, + labels={ + "role": "airflow-worker", + "dag_id": "test_cleanup_stuck_queued_tasks", + "task_id": "bash", + "airflow-worker": 123, + "run_id": "test", + "try_number": 0, + }, + ), + status=k8s.V1PodStatus(phase="Pending"), + ) + ] + ) + create_dummy_dag(dag_id="test_cleanup_stuck_queued_tasks", task_id="bash", with_dagrun_type=None) + dag_run = dag_maker.create_dagrun() + ti = dag_run.task_instances[0] + ti.state = State.QUEUED + ti.queued_by_job_id = 123 + session.flush() + + executor = self.kubernetes_executor + executor.job_id = 123 + executor.kube_client = mock_kube_client + executor.kube_scheduler = mock.MagicMock() + ti.refresh_from_db() + executor.running.add(ti.key) # so we can verify it gets removed after revoke + assert executor.has_task(task_instance=ti) + executor.revoke_task(ti=ti) + assert not executor.has_task(task_instance=ti) + executor.kube_scheduler.patch_pod_revoked.assert_called_once() executor.kube_scheduler.delete_pod.assert_called_once() + mock_kube_client.patch_namespaced_pod.calls[0] == [] assert executor.running == set() @pytest.mark.parametrize( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2975fc49df5d2..8c9096c6849e6 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -28,6 +28,7 @@ from typing import Generator from unittest import mock from unittest.mock import MagicMock, PropertyMock, patch +from uuid import uuid4 import pendulum import psutil @@ -59,6 +60,7 @@ from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest +from airflow.models.log import Log from airflow.models.pool import Pool from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance @@ -124,6 +126,19 @@ def load_examples(): # Patch the MockExecutor into the dict of known executors in the Loader +@contextlib.contextmanager +def _loader_mock(mock_executors): + with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: + # The executors are mocked, so cannot be loaded/imported. Mock load_executor and return the + # correct object for the given input executor name. + loader_mock.side_effect = lambda *x: { + ("default_exec",): mock_executors[0], + (None,): mock_executors[0], + ("secondary_exec",): mock_executors[1], + }[x] + yield + + @patch.dict( ExecutorLoader.executors, {MOCK_EXECUTOR: f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"} ) @@ -2189,7 +2204,18 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_ # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) - def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors): + def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session, mock_executors): + """ + Verify backward compatibility of the executor interface w.r.t. stuck queued. + + Prior to #43520, scheduler called method `cleanup_stuck_queued_tasks`, which failed tis. + + After #43520, scheduler calls `cleanup_tasks_stuck_in_queued`, which requeues tis. + + At Airflow 3.0, we should remove backcompat support for this old function. But for now + we verify that we call it as a fallback. + """ + # todo: remove in airflow 3.0 with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): op1 = EmptyOperator(task_id="op1") op2 = EmptyOperator(task_id="op2", executor="default_exec") @@ -2206,26 +2232,102 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors): scheduler_job = Job() job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0) job_runner._task_queued_timeout = 300 + mock_exec_1 = mock_executors[0] + mock_exec_2 = mock_executors[1] + mock_exec_1.revoke_task.side_effect = NotImplementedError + mock_exec_2.revoke_task.side_effect = NotImplementedError with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: # The executors are mocked, so cannot be loaded/imported. Mock load_executor and return the # correct object for the given input executor name. loader_mock.side_effect = lambda *x: { - ("default_exec",): mock_executors[0], - (None,): mock_executors[0], - ("secondary_exec",): mock_executors[1], + ("default_exec",): mock_exec_1, + (None,): mock_exec_1, + ("secondary_exec",): mock_exec_2, }[x] - job_runner._fail_tasks_stuck_in_queued() + job_runner._handle_tasks_stuck_in_queued() # Default executor is called for ti1 (no explicit executor override uses default) and ti2 (where we # explicitly marked that for execution by the default executor) try: - mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2]) + mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2]) except AssertionError: - mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1]) - mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3]) + mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1]) + mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3]) + + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) + def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): + """Verify that tasks stuck in queued will be rescheduled up to N times.""" + with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): + EmptyOperator(task_id="op1") + EmptyOperator(task_id="op2", executor="default_exec") + + def _queue_tasks(tis): + for ti in tis: + ti.state = "queued" + ti.queued_dttm = timezone.utcnow() + session.commit() + + run_id = str(uuid4()) + dr = dag_maker.create_dagrun(run_id=run_id) + + tis = dr.get_task_instances(session=session) + _queue_tasks(tis=tis) + scheduler_job = Job() + scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0) + # job_runner._reschedule_stuck_task = MagicMock() + scheduler._task_queued_timeout = -300 # always in violation of timeout + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + + # If the task gets stuck in queued once, we reset it to scheduled + tis = dr.get_task_instances(session=session) + assert [x.state for x in tis] == ["scheduled", "scheduled"] + assert [x.queued_dttm for x in tis] == [None, None] + + _queue_tasks(tis=tis) + log_events = [x.event for x in session.scalars(select(Log)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + ] + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + session.commit() + + log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + ] + mock_executors[0].fail.assert_not_called() + tis = dr.get_task_instances(session=session) + assert [x.state for x in tis] == ["scheduled", "scheduled"] + _queue_tasks(tis=tis) + + with _loader_mock(mock_executors): + scheduler._handle_tasks_stuck_in_queued(session=session) + session.commit() + log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] + assert log_events == [ + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued reschedule", + "stuck in queued tries exceeded", + "stuck in queued tries exceeded", + ] + + mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + states = [x.state for x in dr.get_task_instances(session=session)] + assert states == ["failed", "failed"] - def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog): + def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): + """Test that if executor no implement revoke_task then we don't blow up.""" with dag_maker("test_fail_stuck_queued_tasks"): op1 = EmptyOperator(task_id="op1") @@ -2236,12 +2338,14 @@ def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session session.commit() from airflow.executors.local_executor import LocalExecutor + assert "revoke_task" in BaseExecutor.__dict__ + # this is just verifying that LocalExecutor is good enough for this test + # in that it does not implement revoke_task + assert "revoke_task" not in LocalExecutor.__dict__ scheduler_job = Job(executor=LocalExecutor()) job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0) job_runner._task_queued_timeout = 300 - with caplog.at_level(logging.DEBUG): - job_runner._fail_tasks_stuck_in_queued() - assert "Executor doesn't support cleanup of stuck queued tasks. Skipping." in caplog.text + job_runner._handle_tasks_stuck_in_queued() @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent") def test_executor_end_called(self, mock_processor_agent, mock_executors):