Skip to content

Commit

Permalink
Resolve periodic socket.timeout causing control channel message drops
Browse files Browse the repository at this point in the history
  • Loading branch information
maico committed Apr 24, 2024
1 parent d01e84a commit 5791f1a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 30 deletions.
36 changes: 21 additions & 15 deletions etc/kernel-launchers/R/scripts/server_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import logging
import os
import random
import select
import socket
import uuid
from threading import Thread
from typing import Any, Dict, Optional

from Cryptodome.Cipher import AES, PKCS1_v1_5
from Cryptodome.PublicKey import RSA
Expand Down Expand Up @@ -169,30 +171,34 @@ def _get_candidate_port(lower_port, upper_port):
return random.randint(lower_port, upper_port)


def get_server_request(sock):
def get_server_request(sock: socket.socket) -> Optional[Dict[str, Any]]:
"""Gets a request from the server and returns the corresponding dictionary.
This code also exists in the Python kernel-launcher's launch_ipykernel.py script.
"""
conn = None
data = ""
request_info = None
conn: Optional[socket.socket] = None
try:
conn, addr = sock.accept()
while True:
buffer = conn.recv(1024).decode("utf-8")
if not buffer: # send is complete
request_info = json.loads(data)
break
data = data + buffer # append what we received until we get no more...
except Exception as e:
if type(e) is not socket.timeout:
raise e
# Enterprise gateway establishes a new connection for every request.
# Wait until a new connection is made, before accepting and reading.
input_ready, _, except_ready = select.select([sock], [], [])
for sock_ready in input_ready:
conn, addr = sock_ready.accept()
logger.info(f"Accepted connection on control channel from {addr=}")
data: str = ""
while buffer := conn.recv(1024).decode("utf-8"):
data = data + buffer # append what we received until we get no more...
return json.loads(data) if data else None

if except_ready:
logger.error("Control channel socket reported error state")

except Exception:
logger.exception("Control channel socket closed unexpectedly")
finally:
if conn:
conn.close()

return request_info
return None


def server_listener(sock, parent_pid):
Expand Down
36 changes: 21 additions & 15 deletions etc/kernel-launchers/python/scripts/launch_ipykernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import logging
import os
import random
import select
import signal
import socket
import tempfile
import uuid
from multiprocessing import Process
from threading import Thread
from typing import Optional, Dict, Any

from Cryptodome.Cipher import AES, PKCS1_v1_5
from Cryptodome.PublicKey import RSA
Expand Down Expand Up @@ -344,30 +346,34 @@ def _get_candidate_port(lower_port, upper_port):
return random.randint(lower_port, upper_port)


def get_server_request(sock):
def get_server_request(sock: socket.socket) -> Optional[Dict[str, Any]]:
"""Gets a request from the server and returns the corresponding dictionary.
This code also exists in the R kernel-launcher's server_listener.py script.
"""
conn = None
data = ""
request_info = None
conn: Optional[socket.socket] = None
try:
conn, addr = sock.accept()
while True:
buffer = conn.recv(1024).decode("utf-8")
if not buffer: # send is complete
request_info = json.loads(data)
break
data = data + buffer # append what we received until we get no more...
except Exception as e:
if type(e) is not socket.timeout:
raise e
# Enterprise gateway establishes a new connection for every request.
# Wait until a new connection is made, before accepting and reading.
input_ready, _, except_ready = select.select([sock], [], [])
for sock_ready in input_ready:
conn, addr = sock_ready.accept()
logger.info(f"Accepted connection on control channel from {addr=}")
data: str = ""
while buffer := conn.recv(1024).decode("utf-8"):
data = data + buffer # append what we received until we get no more...
return json.loads(data) if data else None

if except_ready:
logger.error("Control channel sock reported error state")

except Exception:
logger.exception("Control channel socket closed unexpectedly")
finally:
if conn:
conn.close()

return request_info
return None


def cancel_spark_jobs(sig, frame):
Expand Down

0 comments on commit 5791f1a

Please sign in to comment.