From cf312f262f2af178de3f741d9be41bea1d6d1654 Mon Sep 17 00:00:00 2001 From: Dmitry Balabka Date: Tue, 29 Aug 2023 16:55:34 +0300 Subject: [PATCH] Fix port number when use NodePort (#806) --- dask_kubernetes/common/networking.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index c26d9ccd2..e5630631b 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -56,18 +56,16 @@ async def get_external_address_for_scheduler_service( local_port=None, ): """Take a service object and return the scheduler address.""" - [port] = [ - port.port - for port in service.spec.ports - if port.name == service.metadata.name or port.name == port_name - ] if service.spec.type == "LoadBalancer": + port = _get_port(service) lb = service.status.load_balancer.ingress[0] host = lb.hostname or lb.ip elif service.spec.type == "NodePort": + port = _get_port(service, is_node_port=True) nodes = await core_api.list_node() host = nodes.items[0].status.addresses[0].address elif service.spec.type == "ClusterIP": + port = _get_port(service) if not port_forward_cluster_ip: with suppress(socket.gaierror): # Try to resolve the service name. If we are inside the cluster this should succeed. @@ -86,6 +84,16 @@ async def get_external_address_for_scheduler_service( return f"tcp://{host}:{port}" +def _get_port(service, port_name, is_node_port=False): + """NodePort is a special case when we have to use node_port instead of node""" + [port] = [ + port.port if not is_node_port else port.node_port + for port in service.spec.ports + if port.name == service.metadata.name or port.name == port_name + ] + return port + + async def _is_service_available(host, port, retries=20): for i in range(retries): try: