diff --git a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java index de6b154fd097a..f5f15253d42d9 100644 --- a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java @@ -182,9 +182,9 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) { // to avoid permanently blocking node joins // This situation should ideally not happen, this is just for extra safety transportService.removePendingDisconnections( - targetsByNode.keySet() + transportService.getPendingDisconnections() .stream() - .filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode)) + .filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode) && !targetsByNode.containsKey(discoveryNode)) .collect(Collectors.toSet()) ); } diff --git a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java index 58be31135c46e..a5514a74cd437 100644 --- a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java @@ -247,6 +247,11 @@ public void disconnectFromNode(DiscoveryNode node) { logger.debug("Removed node [{}] from pending disconnections list", node); } + @Override + public Set getPendingDisconnections() { + return pendingDisconnections; + } + @Override public void setPendingDisconnection(DiscoveryNode node) { logger.debug("marking disconnection as pending for node: [{}]", node); diff --git a/server/src/main/java/org/opensearch/transport/ConnectionManager.java b/server/src/main/java/org/opensearch/transport/ConnectionManager.java index 1a5726887f246..ef82910790ec1 100644 --- a/server/src/main/java/org/opensearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ConnectionManager.java @@ -65,6 +65,8 @@ void connectToNode( void disconnectFromNode(DiscoveryNode node); + Set getPendingDisconnections(); + void setPendingDisconnection(DiscoveryNode node); void removePendingDisconnection(DiscoveryNode node); diff --git a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java index 9eab2c4599e3b..fadcda9c514d9 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java @@ -114,6 +114,11 @@ public void disconnectFromNode(DiscoveryNode node) { delegate.disconnectFromNode(node); } + @Override + public Set getPendingDisconnections() { + return delegate.getPendingDisconnections(); + } + @Override public void setPendingDisconnection(DiscoveryNode node) { delegate.setPendingDisconnection(node); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index e65c7a081ab14..d1f04e25322e2 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -773,6 +773,10 @@ public void disconnectFromNode(DiscoveryNode node) { connectionManager.disconnectFromNode(node); } + public Set getPendingDisconnections() { + return connectionManager.getPendingDisconnections(); + } + public void setPendingDisconnections(Set nodes) { nodes.forEach(connectionManager::setPendingDisconnection); } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java index f68648b31f3b4..feffccd445952 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java @@ -123,6 +123,11 @@ public void disconnectFromNode(DiscoveryNode node) { delegate.disconnectFromNode(node); } + @Override + public Set getPendingDisconnections() { + return delegate.getPendingDisconnections(); + } + @Override public void setPendingDisconnection(DiscoveryNode node) { delegate.setPendingDisconnection(node);