diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py b/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py index f7a93d62a..1691320bc 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py @@ -90,6 +90,7 @@ def __init__( self.zmq_socket.setsockopt(zmq.SNDTIMEO, SNDTIMEO) if linger is not None: self.zmq_socket.setsockopt(zmq.LINGER, linger) + self.zmq_socket.setsockopt(zmq.IPV6, True) # all zmq setsockopt calls must be done before bind/connect is called if self.mode == "server": diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py index e9d5c1c76..cac715ec0 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py @@ -232,16 +232,19 @@ def __init__( self.context = zmq.Context() self.task_incoming = self.context.socket(zmq.DEALER) self.task_incoming.set_hwm(0) + self.task_incoming.setsockopt(zmq.IPV6, True) self.task_incoming.RCVTIMEO = 10 # in milliseconds log.info(f"Task incoming on tcp://{client_address}:{client_ports[0]}") self.task_incoming.connect(f"tcp://{client_address}:{client_ports[0]}") self.results_outgoing = self.context.socket(zmq.DEALER) self.results_outgoing.set_hwm(0) + self.results_outgoing.setsockopt(zmq.IPV6, True) log.info(f"Results outgoing on tcp://{client_address}:{client_ports[1]}") self.results_outgoing.connect(f"tcp://{client_address}:{client_ports[1]}") self.command_channel = self.context.socket(zmq.DEALER) + self.command_channel.setsockopt(zmq.IPV6, True) self.command_channel.RCVTIMEO = 1000 # in milliseconds # self.command_channel.set_hwm(0) log.info(f"Command _channel on tcp://{client_address}:{client_ports[2]}") @@ -260,8 +263,10 @@ def __init__( self.task_outgoing = self.context.socket(zmq.ROUTER) self.task_outgoing.set_hwm(0) + self.task_outgoing.setsockopt(zmq.IPV6, True) self.results_incoming = self.context.socket(zmq.ROUTER) self.results_incoming.set_hwm(0) + self.results_incoming.setsockopt(zmq.IPV6, True) self.endpoint_id = endpoint_id worker_bind_address = f"tcp://{self.interchange_address}" diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py index 82f8da75a..0220518a1 100755 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py @@ -212,8 +212,9 @@ def __init__( self.internal_worker_port_range = internal_worker_port_range self.funcx_task_socket = self.context.socket(zmq.ROUTER) + self.funcx_task_socket.setsockopt(zmq.IPV6, True) self.funcx_task_socket.set_hwm(0) - self.address = "127.0.0.1" + self.address = "::1" self.worker_port = self.funcx_task_socket.bind_to_random_port( "tcp://*", min_port=self.internal_worker_port_range[0],