Skip to content

Commit

Permalink
fix(backend): handle client side HTTP timeouts to fix crashes of meta…
Browse files Browse the repository at this point in the history
…data-writer. Fixes #8200 (#11361)

* Catch exceptions in case of client side timeouts of k8s_watch.stream, adjust timeouts according to the recommendations of the Kubernetes client authors.

Signed-off-by: Evgeniy Mamchenko <[email protected]>

* Remove catch-all except so that unexpected errors are not hidden from users.

Signed-off-by: Evgeniy Mamchenko <[email protected]>

* Add pass statement below server-side timeout comment for clarity.

Signed-off-by: Evgeniy Mamchenko <[email protected]>

* Improve the comment about the server-side timeout.

Signed-off-by: Evgeniy Mamchenko <[email protected]>

* Replace pass with continue in the handler of ReadTimeoutError.

Signed-off-by: Evgeniy Mamchenko <[email protected]>

---------

Signed-off-by: Evgeniy Mamchenko <[email protected]>
  • Loading branch information
OutSorcerer authored Nov 26, 2024
1 parent 11c5d7e commit 94a21cc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion backend/metadata_writer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ grpcio==1.66.1
# via ml-metadata
idna==3.10
# via requests
kubernetes==10.0.1
kubernetes==31.0.0
# via -r -
lru-dict==1.3.0
# via -r -
Expand Down
28 changes: 18 additions & 10 deletions backend/metadata_writer/src/metadata_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import collections
import kubernetes
import yaml
import urllib3
from time import sleep
import lru

Expand All @@ -30,7 +31,10 @@
workflow_name_to_context_id_size = os.environ.get('WORKFLOW_NAME_TO_CONTEXT_ID_SIZE', 5000)
pods_with_written_metadata_size = os.environ.get('PODS_WITH_WRITTEN_METADATA_SIZE', 5000)
debug_files_size = os.environ.get('DEBUG_FILES_SIZE', 5000)

# See the documentation on settings k8s_watch timeouts:
# https://github.com/kubernetes-client/python/blob/master/examples/watch/timeout-settings.md
k8s_watch_server_side_timeout = os.environ.get('K8S_WATCH_SERVER_SIDE_TIMEOUT', 1800)
k8s_watch_client_side_timeout = os.environ.get('K8S_WATCH_CLIENT_SIDE_TIMEOUT', 60)

kubernetes.config.load_incluster_config()
k8s_api = kubernetes.client.CoreV1Api()
Expand Down Expand Up @@ -150,18 +154,18 @@ def is_kfp_v2_pod(pod) -> bool:
k8s_api.list_namespaced_pod,
namespace=namespace_to_watch,
label_selector=ARGO_WORKFLOW_LABEL_KEY,
timeout_seconds=1800, # Sometimes watch gets stuck
_request_timeout=2000, # Sometimes HTTP GET gets stuck
timeout_seconds=k8s_watch_server_side_timeout,
_request_timeout=k8s_watch_client_side_timeout,
)
else:
pod_stream = k8s_watch.stream(
k8s_api.list_pod_for_all_namespaces,
label_selector=ARGO_WORKFLOW_LABEL_KEY,
timeout_seconds=1800, # Sometimes watch gets stuck
_request_timeout=2000, # Sometimes HTTP GET gets stuck
timeout_seconds=k8s_watch_server_side_timeout,
_request_timeout=k8s_watch_client_side_timeout,
)
for event in pod_stream:
try:
try:
for event in pod_stream:
obj = event['object']
print('Kubernetes Pod event: ', event['type'], obj.metadata.name, obj.metadata.resource_version)
if event['type'] == 'ERROR':
Expand Down Expand Up @@ -393,6 +397,10 @@ def is_kfp_v2_pod(pod) -> bool:

pods_with_written_metadata[obj.metadata.name] = None

except Exception as e:
import traceback
print(traceback.format_exc())
# If the for loop ended, a server-side timeout occurred. Continue watching.
pass

except urllib3.exceptions.ReadTimeoutError as e:
# Client side timeout, continue watching.
continue

0 comments on commit 94a21cc

Please sign in to comment.