diff --git a/docs/source/operators/config-cli.md b/docs/source/operators/config-cli.md index 39c44ae4..2eb80f92 100644 --- a/docs/source/operators/config-cli.md +++ b/docs/source/operators/config-cli.md @@ -366,4 +366,14 @@ RemoteMappingKernelManager(AsyncMappingKernelManager) options --RemoteMappingKernelManager.shared_context= Share a single zmq.Context to talk to all my kernels Default: True +--RemoteMappingKernelManager.kernel_launch_terminate_on_events=... + Comma-separated list of dictionaries, each describing an event by `type`, `reason`, and `timeout_in_seconds` + (e.g. [{"type": "Warning", "reason": "FailedMount", "timeout_in_seconds": 0}]). + Kernel pod events will be sampled during startup, and if an event described in this list is detected, + the kernel launch will be terminated after the set timeout. + Only available for container kernels. + Make sure to provide the exact event type and reason as it appears in your container platform, + A guide to finding the event type and reason in kubernetes can be found here: + `https://www.bluematador.com/blog/kubernetes-events-explained` + Default: [] ``` diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index f906d68f..55f3685f 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -6,6 +6,7 @@ from __future__ import annotations import asyncio +import json import os import re import signal @@ -17,7 +18,8 @@ from jupyter_client.kernelspec import KernelSpec from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager from tornado import web -from traitlets import directional_link +from traitlets import List as ListTrait +from traitlets import default, directional_link from traitlets import log as traitlets_log from zmq import IO_THREADS, MAX_SOCKETS, Context @@ -161,6 +163,24 @@ class RemoteMappingKernelManager(AsyncMappingKernelManager): Extends the AsyncMappingKernelManager with support for managing remote kernels via the process-proxy. """ + kernel_launch_terminate_on_events_env = "EG_KERNEL_LAUNCH_TERMINATE_ON_EVENTS" + kernel_launch_terminate_on_events_default_value: ClassVar[list] = [] + kernel_launch_terminate_on_events = ListTrait( + default_value=kernel_launch_terminate_on_events_default_value, + config=True, + help="""Comma-separated list of dictionaries, each describing an event by `type`, `reason`, + and `timeout_in_seconds` (e.g. [{"type": "Warning", "reason": "FailedMount", "timeout_in_seconds": 0}]). + Kernel pod events will be sampled during startup, and if an event described in this list is detected, + the kernel launch will be terminated after the set timeout. Only available for container kernels. + Make sure to provide the exact event type and reason as it appears in your container platform, + A guide to finding the event type and reason in kubernetes can be found here: + `https://www.bluematador.com/blog/kubernetes-events-explained`""", + ) + + @default("kernel_launch_terminate_on_events") + def _kernel_launch_terminate_on_events_default(self) -> list: + return json.loads(os.getenv(self.kernel_launch_terminate_on_events_env, "[]")) + def _context_default(self) -> Context: """ We override the _context_default method in diff --git a/enterprise_gateway/services/processproxies/container.py b/enterprise_gateway/services/processproxies/container.py index 6378b633..a791e2d2 100644 --- a/enterprise_gateway/services/processproxies/container.py +++ b/enterprise_gateway/services/processproxies/container.py @@ -8,6 +8,7 @@ import abc import os import signal +from collections import defaultdict from typing import Any import urllib3 # docker ends up using this and it causes lots of noise, so turn off warnings @@ -46,6 +47,21 @@ def __init__(self, kernel_manager: RemoteKernelManager, proxy_config: dict): super().__init__(kernel_manager, proxy_config) self.container_name = "" self.assigned_node_ip = None + self.kernel_events_to_occurrence_time = {} + self._initialize_kernel_launch_terminate_on_events() + + def _initialize_kernel_launch_terminate_on_events(self): + """ + Parse the `kernel_launch_terminate_on_events` configuration, for easier access during startup. + [{"type": "Warning", "reason": "FailedMount", "timeout_in_seconds": 0}, + {"type": Warning", "reason": "Unschedulable", "timeout_in_seconds": 30}] -> + {"Warning": {"FailedMount": 0, "Unschedulable": 30}} + """ + self.kernel_launch_terminate_on_events = defaultdict(dict) + for configuration in self.kernel_manager.parent.kernel_launch_terminate_on_events: + self.kernel_launch_terminate_on_events[configuration["type"]][ + configuration["reason"] + ] = configuration["timeout_in_seconds"] def _determine_kernel_images(self, **kwargs: dict[str, Any] | None) -> None: """ @@ -190,6 +206,7 @@ async def confirm_remote_startup(self) -> None: """Confirms the container has started and returned necessary connection information.""" self.log.debug("Trying to confirm kernel container startup status") self.start_time = RemoteProcessProxy.get_current_time() + self.kernel_events_to_occurrence_time = {} i = 0 ready_to_connect = False # we're ready to connect when we have a connection file to use while not ready_to_connect: @@ -203,6 +220,8 @@ async def confirm_remote_startup(self) -> None: http_status_code=500, reason=f"Error starting kernel container; status: '{container_status}'.", ) + elif container_status == "pending": + self._handle_pending_kernel() else: if self.assigned_host: ready_to_connect = await self.receive_connection_info() @@ -212,6 +231,38 @@ async def confirm_remote_startup(self) -> None: self.pgid = 0 else: self.detect_launch_failure() + self.kernel_events_to_occurrence_time = {} + + def _handle_pending_kernel(self): + """Sample container events and compare them to configured events which may cause termination. + The event type and the event reason should match those sampled in the environment to initiate termination. + Possible event types: `Warning`, `Normal`. + Possible event reasons (may differ in different container platforms and versions): `FailedMount`, `FailedMountAttach`, + `FailedSchedule`, `ImagePullBackoff`, etc.""" + self.log.debug("Sampling kernel container events") + kernel_pod_events = self.get_container_events() + for event in kernel_pod_events: + if ( + event.type in self.kernel_launch_terminate_on_events + and event.reason in self.kernel_launch_terminate_on_events[event.type] + ): + event_key = f"{event.type}{event.reason}" + if event_key not in self.kernel_events_to_occurrence_time: + self.kernel_events_to_occurrence_time[event_key] = ( + RemoteProcessProxy.get_current_time() + ) + if ( + RemoteProcessProxy.get_time_diff( + RemoteProcessProxy.get_current_time(), + self.kernel_events_to_occurrence_time[event_key], + ) + >= self.kernel_launch_terminate_on_events[event.type][event.reason] + ): + self.kill() + self.log_and_raise( + http_status_code=409, + reason=f"Error starting kernel container; The container encountered an event which may cause a longer than usual startup: '{event.reason} - {event.message[:64]}'", + ) def get_process_info(self) -> dict[str, Any]: """Captures the base information necessary for kernel persistence relative to containers.""" @@ -243,6 +294,11 @@ def get_container_status(self, iteration: int | None) -> str: """Returns the current container state (in lowercase) or the empty string if not available.""" raise NotImplementedError + @abc.abstractmethod + def get_container_events(self) -> list: + """Returns a list of container events, or empty list if the container has no events.""" + raise NotImplementedError + @abc.abstractmethod def terminate_container_resources(self): """Terminate any artifacts created on behalf of the container's lifetime.""" diff --git a/enterprise_gateway/services/processproxies/k8s.py b/enterprise_gateway/services/processproxies/k8s.py index 00e6bf17..2e70b09e 100644 --- a/enterprise_gateway/services/processproxies/k8s.py +++ b/enterprise_gateway/services/processproxies/k8s.py @@ -110,6 +110,19 @@ def get_container_status(self, iteration: int | None) -> str: return pod_status + def get_container_events(self) -> list: + """Return container events""" + pod_events = [] + core_v1_api = client.CoreV1Api() + if self.container_name: + ret = core_v1_api.list_namespaced_event( + namespace=self.kernel_namespace, + field_selector=f"involvedObject.name={self.container_name}", + ) + if ret and ret.items: + pod_events = ret.items + return pod_events + def delete_managed_object(self, termination_stati: list[str]) -> bool: """Deletes the object managed by this process-proxy