diff --git a/CHANGELOG.md b/CHANGELOG.md index e35ca9c2fcb64..07637eaae3306 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix search_as_you_type not supporting multi-fields ([#15988](https://github.com/opensearch-project/OpenSearch/pull/15988)) - Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985)) - Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931)) +- Fix race condition in node-join and node-left ([#15521](https://github.com/opensearch-project/OpenSearch/pull/15521)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java new file mode 100644 index 0000000000000..014e2bf642a4d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java @@ -0,0 +1,355 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.MockEngineFactoryPlugin; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.tasks.Task; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.test.TestLogsAppender; +import org.opensearch.test.store.MockFSIndexStore; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.test.transport.StubbableTransport; +import org.opensearch.transport.ClusterConnectionManager; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; +import static org.hamcrest.Matchers.is; + +/** + Check https://github.com/opensearch-project/OpenSearch/issues/4874 and + https://github.com/opensearch-project/OpenSearch/pull/15521 for context + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +public class NodeJoinLeftIT extends OpenSearchIntegTestCase { + + private TestLogsAppender testLogsAppender; + private String clusterManager; + private String redNodeName; + private LoggerContext loggerContext; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList( + MockTransportService.TestPlugin.class, + MockFSIndexStore.TestPlugin.class, + InternalSettingsPlugin.class, + MockEngineFactoryPlugin.class + ); + } + + @Override + protected void beforeIndexDeletion() throws Exception { + super.beforeIndexDeletion(); + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + internalCluster().assertSeqNos(); + internalCluster().assertSameDocIdsOnShards(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + // Add any other specific messages you want to capture + List messagesToCapture = Arrays.asList("failed to join", "IllegalStateException"); + testLogsAppender = new TestLogsAppender(messagesToCapture); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.addAppender(testLogsAppender, null, null); + loggerContext.updateLoggers(); + + String indexName = "test"; + final Settings nodeSettings = Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms") + .put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s") + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "100ms") + .build(); + // start a 3 node cluster with 1 cluster-manager + this.clusterManager = internalCluster().startNode(nodeSettings); + internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + this.redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + + // validate the 3 node cluster is up + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); + assertThat(response.isTimedOut(), is(false)); + + // create an index + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get(); + } + + @After + public void tearDown() throws Exception { + testLogsAppender.clearCapturedLogs(); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); + loggerContext.updateLoggers(); + super.tearDown(); + } + + public void testClusterStabilityWhenJoinRequestHappensDuringNodeLeftTask() throws Exception { + + ClusterService clusterManagerClsService = internalCluster().getInstance(ClusterService.class, clusterManager); + // Simulate a slow applier on the cm to delay node-left state application + clusterManagerClsService.addStateApplier(event -> { + if (event.nodesRemoved()) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + // Toggle to succeed/fail the followerchecker to simulate the initial node leaving. + AtomicBoolean succeedFollowerChecker = new AtomicBoolean(); + + // Simulate followerchecker failure on 1 node when succeedFollowerChecker is false + FollowerCheckerBehaviour simulatedFailureBehaviour = new FollowerCheckerBehaviour(() -> { + if (succeedFollowerChecker.get()) { + return; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new NodeHealthCheckFailureException("fake followerchecker failure simulated by test to repro race condition"); + }); + MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + redNodeName + ); + redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour); + + // Loop runs 5 times to ensure race condition gets reproduced + testLogsAppender.clearCapturedLogs(); + for (int i = 0; i < 5; i++) { + logger.info("--> simulating followerchecker failure to trigger node-left"); + succeedFollowerChecker.set(false); + ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response1.isTimedOut(), is(false)); + + // once we know a node has left, we can re-enable followerchecker to work normally and validate node rejoins + logger.info("--> re-enabling normal followerchecker and validating cluster is stable"); + succeedFollowerChecker.set(true); + ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response2.isTimedOut(), is(false)); + + Thread.sleep(1000); + // checking again to validate stability and ensure node did not leave + ClusterHealthResponse response3 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response3.isTimedOut(), is(false)); + } + + succeedFollowerChecker.set(true); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + + // assert that join requests fail with the right exception + boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS) + && testLogsAppender.waitForLog( + "IllegalStateException[cannot make a new connection as disconnect to node", + 30, + TimeUnit.SECONDS + ); + assertTrue("Expected log was not found within the timeout period", logFound); + } + + public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Exception { + ClusterService clusterManagerClsService = internalCluster().getInstance(ClusterService.class, clusterManager); + // Simulate a slow applier on the cm to delay node-left state application + clusterManagerClsService.addStateApplier(event -> { + if (event.nodesRemoved()) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + // Toggle to succeed/fail the followerchecker to simulate the initial node leaving. + AtomicBoolean succeedFollowerChecker = new AtomicBoolean(); + + // Simulate followerchecker failure on 1 node when succeedFollowerChecker is false + FollowerCheckerBehaviour simulatedFailureBehaviour = new FollowerCheckerBehaviour(() -> { + if (succeedFollowerChecker.get()) { + return; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new NodeHealthCheckFailureException("fake followerchecker failure simulated by test to repro race condition"); + }); + MockTransportService cmTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + clusterManager + ); + MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + redNodeName + ); + redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour); + + // Loop runs 5 times to ensure race condition gets reproduced + testLogsAppender.clearCapturedLogs(); + for (int i = 0; i < 5; i++) { + // Fail followerchecker by force to trigger node disconnect and node left + logger.info("--> simulating followerchecker failure to trigger node-left"); + succeedFollowerChecker.set(false); + Thread.sleep(1000); + + // Trigger a node disconnect while node-left task is still processing + logger.info( + "--> triggering a simulated disconnect on red node, after the follower checker failed to see how node-left task deals with this" + ); + cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode()); + + ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response1.isTimedOut(), is(false)); + + // once we know a node has left, we can re-enable followerchecker to work normally and validate node rejoins + logger.info("--> re-enabling normal followerchecker and validating cluster is stable"); + succeedFollowerChecker.set(true); + ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response2.isTimedOut(), is(false)); + + Thread.sleep(1000); + // checking again to validate stability and ensure node did not leave + ClusterHealthResponse response3 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response3.isTimedOut(), is(false)); + } + + succeedFollowerChecker.set(true); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + + // assert that join requests fail with the right exception + boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS); + assertTrue("Expected log was not found within the timeout period", logFound); + logFound = testLogsAppender.waitForLog( + "IllegalStateException[cannot make a new connection as disconnect to node", + 30, + TimeUnit.SECONDS + ); + assertTrue("Expected log was not found within the timeout period", logFound); + } + + public void testRestartDataNode() throws Exception { + + Settings redNodeDataPathSettings = internalCluster().dataPathSettings(redNodeName); + logger.info("-> stopping data node"); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(redNodeName)); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response.isTimedOut(), is(false)); + + logger.info("-> restarting stopped node"); + internalCluster().startNode(Settings.builder().put("node.name", redNodeName).put(redNodeDataPathSettings).build()); + response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + } + + public void testRestartCmNode() throws Exception { + + Settings cmNodeSettings = internalCluster().dataPathSettings(clusterManager); + + logger.info("-> stopping cluster-manager node"); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(clusterManager)); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response.isTimedOut(), is(false)); + + logger.info("-> restarting stopped node"); + internalCluster().startNode(Settings.builder().put("node.name", clusterManager).put(cmNodeSettings).build()); + response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + } + + private class FollowerCheckerBehaviour implements StubbableTransport.RequestHandlingBehavior { + private final Runnable connectionBreaker; + + private FollowerCheckerBehaviour(Runnable connectionBreaker) { + this.connectionBreaker = connectionBreaker; + } + + @Override + public void messageReceived( + TransportRequestHandler handler, + TransportRequest request, + TransportChannel channel, + Task task + ) throws Exception { + + connectionBreaker.run(); + handler.messageReceived(request, channel, task); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java index 1c12c260b3929..8ce11c8183cf6 100644 --- a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java @@ -103,10 +103,10 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { // contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of // disconnecting - private final Map targetsByNode = new HashMap<>(); + protected final Map targetsByNode = new HashMap<>(); private final TimeValue reconnectInterval; - private volatile ConnectionChecker connectionChecker; + protected volatile ConnectionChecker connectionChecker; @Inject public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { @@ -115,6 +115,11 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); } + // exposed for testing + protected ConnectionTarget createConnectionTarget(DiscoveryNode discoveryNode) { + return new ConnectionTarget(discoveryNode); + } + /** * Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all * connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known. @@ -159,6 +164,14 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) runnables.forEach(Runnable::run); } + public void setPendingDisconnections(Set nodes) { + nodes.forEach(transportService::setPendingDisconnection); + } + + public void clearPendingDisconnections() { + transportService.clearPendingDisconnections(); + } + /** * Disconnect from any nodes to which we are currently connected which do not appear in the given nodes. Does not wait for the * disconnections to complete, because they might have to wait for ongoing connection attempts first. @@ -211,7 +224,7 @@ private void awaitPendingActivity(Runnable onCompletion) { * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection * attempts have completed. */ - private void connectDisconnectedTargets(Runnable onCompletion) { + protected void connectDisconnectedTargets(Runnable onCompletion) { final List runnables = new ArrayList<>(); synchronized (mutex) { final Collection connectionTargets = targetsByNode.values(); @@ -321,7 +334,7 @@ private enum ActivityType { * * @opensearch.internal */ - private class ConnectionTarget { + protected class ConnectionTarget { private final DiscoveryNode discoveryNode; private PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 13a57d93f03f0..9859abe503eaa 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.LocalClusterUpdateTask; +import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; @@ -187,6 +188,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; private final RemoteStoreNodeService remoteStoreNodeService; + private NodeConnectionsService nodeConnectionsService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -418,7 +420,11 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { synchronized (mutex) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getClusterManagerNode(); - logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); + logger.debug( + "handlePublishRequest: handling version [{}] from [{}]", + publishRequest.getAcceptedState().getVersion(), + sourceNode + ); if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) { // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication @@ -630,7 +636,6 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> { final ClusterState stateForJoinValidation = getStateForClusterManagerService(); - if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) { onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { @@ -814,6 +819,10 @@ public void onFailure(String source, Exception e) { public ClusterTasksResult execute(ClusterState currentState) { if (currentState.nodes().isLocalNodeElectedClusterManager() == false) { allocationService.cleanCaches(); + // This set only needs to be maintained on active cluster-manager + // This is cleaned up to avoid stale entries which would block future reconnections + logger.trace("Removing all pending disconnections as part of cluster-manager cleanup"); + nodeConnectionsService.clearPendingDisconnections(); } return unchanged(); } @@ -914,11 +923,18 @@ public DiscoveryStats stats() { @Override public void startInitialJoin() { synchronized (mutex) { + logger.trace("Starting initial join, becoming candidate"); becomeCandidate("startInitialJoin"); } clusterBootstrapService.scheduleUnconfiguredBootstrap(); } + @Override + public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { + assert this.nodeConnectionsService == null : "nodeConnectionsService is already set"; + this.nodeConnectionsService = nodeConnectionsService; + } + @Override protected void doStop() { configuredHostsResolver.stop(); @@ -1356,6 +1372,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) currentPublication = Optional.of(publication); final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes(); + // marking pending disconnects before publish + // if a nodes tries to send a joinRequest while it is pending disconnect, it should fail + nodeConnectionsService.setPendingDisconnections(new HashSet<>(clusterChangedEvent.nodesDelta().removedNodes())); leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Publication.java b/server/src/main/java/org/opensearch/cluster/coordination/Publication.java index 43801a05dbc24..3f7218939be92 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Publication.java @@ -85,7 +85,7 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS } public void start(Set faultyNodes) { - logger.trace("publishing {} to {}", publishRequest, publicationTargets); + logger.debug("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets); for (final DiscoveryNode faultyNode : faultyNodes) { onFaultyNode(faultyNode); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index cdf331b7bb577..caed2b6eceb49 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -542,7 +542,7 @@ public String executor() { } public void sendClusterState(DiscoveryNode destination, ActionListener listener) { - logger.debug("sending cluster state over transport to node: {}", destination.getName()); + logger.trace("sending cluster state over transport to node: {}", destination.getName()); if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); sendFullClusterState(destination, listener); diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index 47080cfbde692..d0b6f812e9ee2 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -502,6 +502,7 @@ private void runTask(UpdateTask task) { try { applyChanges(task, previousClusterState, newClusterState, stopWatch); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); + // At this point, cluster state appliers and listeners are completed logger.debug( "processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source, @@ -510,6 +511,7 @@ private void runTask(UpdateTask task) { newClusterState.stateUUID() ); warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); + // Then we call the ClusterApplyListener of the task task.listener.onSuccess(task.source); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); @@ -578,6 +580,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl logger.debug("apply cluster state with version {}", newClusterState.version()); callClusterStateAppliers(clusterChangedEvent, stopWatch); + logger.debug("completed calling appliers of cluster state for version {}", newClusterState.version()); nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); @@ -594,6 +597,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl state.set(newClusterState); callClusterStateListeners(clusterChangedEvent, stopWatch); + logger.debug("completed calling listeners of cluster state for version {}", newClusterState.version()); } protected void connectToNodesAndWait(ClusterState newClusterState) { diff --git a/server/src/main/java/org/opensearch/discovery/Discovery.java b/server/src/main/java/org/opensearch/discovery/Discovery.java index 9d6807b6522c9..6d9fb1f4985df 100644 --- a/server/src/main/java/org/opensearch/discovery/Discovery.java +++ b/server/src/main/java/org/opensearch/discovery/Discovery.java @@ -32,6 +32,7 @@ package org.opensearch.discovery; +import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.common.lifecycle.LifecycleComponent; @@ -54,4 +55,8 @@ public interface Discovery extends LifecycleComponent, ClusterStatePublisher { */ void startInitialJoin(); + /** + * Sets the NodeConnectionsService which is an abstraction used for connection management + */ + void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a8d4ebcf23dab..4962d72d8728a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1602,6 +1602,7 @@ public Node start() throws NodeValidationException { injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); + discovery.setNodeConnectionsService(nodeConnectionsService); clusterService.getClusterManagerService().setClusterStatePublisher(discovery::publish); // Start the transport service now so the publish address will be added to the local disco node in ClusterService diff --git a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java index e634323d58269..3a3e8c964b6c5 100644 --- a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java @@ -64,6 +64,15 @@ public class ClusterConnectionManager implements ConnectionManager { private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap> pendingConnections = ConcurrentCollections.newConcurrentMap(); + /** + This set is used only by cluster-manager nodes. + Nodes are marked as pending disconnect right before cluster state publish phase. + They are cleared up as part of cluster state apply commit phase + This is to avoid connections from being made to nodes that are in the process of leaving the cluster + Note: If a disconnect is initiated while a connect is in progress, this Set will not handle this case. + Callers need to ensure that connects and disconnects are sequenced. + */ + private final Set pendingDisconnections = ConcurrentCollections.newConcurrentSet(); private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") { @Override protected void closeInternal() { @@ -122,12 +131,19 @@ public void connectToNode( ConnectionValidator connectionValidator, ActionListener listener ) throws ConnectTransportException { + logger.trace("connecting to node [{}]", node); ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); if (node == null) { listener.onFailure(new ConnectTransportException(null, "can't connect to a null node")); return; } + // if node-left is still in progress, we fail the connect request early + if (pendingDisconnections.contains(node)) { + listener.onFailure(new IllegalStateException("cannot make a new connection as disconnect to node [" + node + "] is pending")); + return; + } + if (connectingRefCounter.tryIncRef() == false) { listener.onFailure(new IllegalStateException("connection manager is closed")); return; @@ -170,6 +186,7 @@ public void connectToNode( conn.addCloseListener(ActionListener.wrap(() -> { logger.trace("unregistering {} after connection close and marking as disconnected", node); connectedNodes.remove(node, finalConnection); + pendingDisconnections.remove(node); connectionListener.onNodeDisconnected(node, conn); })); } @@ -226,6 +243,19 @@ public void disconnectFromNode(DiscoveryNode node) { // if we found it and removed it we close nodeChannels.close(); } + pendingDisconnections.remove(node); + logger.trace("Removed node [{}] from pending disconnections list", node); + } + + @Override + public void setPendingDisconnection(DiscoveryNode node) { + logger.trace("marking disconnection as pending for node: [{}]", node); + pendingDisconnections.add(node); + } + + @Override + public void clearPendingDisconnections() { + pendingDisconnections.clear(); } /** diff --git a/server/src/main/java/org/opensearch/transport/ConnectionManager.java b/server/src/main/java/org/opensearch/transport/ConnectionManager.java index 10cfc2907098f..ebd5ccf29c8cc 100644 --- a/server/src/main/java/org/opensearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ConnectionManager.java @@ -65,6 +65,10 @@ void connectToNode( void disconnectFromNode(DiscoveryNode node); + void setPendingDisconnection(DiscoveryNode node); + + void clearPendingDisconnections(); + Set getAllConnectedNodes(); int size(); diff --git a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java index bd646f10df517..52f29bea8050d 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java @@ -114,6 +114,16 @@ public void disconnectFromNode(DiscoveryNode node) { delegate.disconnectFromNode(node); } + @Override + public void setPendingDisconnection(DiscoveryNode node) { + delegate.setPendingDisconnection(node); + } + + @Override + public void clearPendingDisconnections() { + delegate.clearPendingDisconnections(); + } + @Override public ConnectionProfile getConnectionProfile() { return delegate.getConnectionProfile(); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index fff6d82b23c7e..fe8631aa5ca3d 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -773,6 +773,18 @@ public void disconnectFromNode(DiscoveryNode node) { connectionManager.disconnectFromNode(node); } + public void setPendingDisconnection(DiscoveryNode node) { + connectionManager.setPendingDisconnection(node); + } + + /** + * Wipes out all pending disconnections. + * This is called on cluster-manager failover to remove stale entries + */ + public void clearPendingDisconnections() { + connectionManager.clearPendingDisconnections(); + } + public void addMessageListener(TransportMessageListener listener) { messageListener.listeners.add(listener); } diff --git a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java index 4cf82f1dabab3..4500860c937ea 100644 --- a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java @@ -35,6 +35,9 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; import org.opensearch.action.support.PlainActionFuture; @@ -53,9 +56,11 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.TestLogsAppender; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ClusterConnectionManager; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.ConnectionProfile; import org.opensearch.transport.Transport; @@ -69,6 +74,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -77,6 +83,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static java.util.Collections.emptySet; @@ -86,12 +93,15 @@ import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.opensearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class NodeConnectionsServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; private TransportService transportService; private Map> nodeConnectionBlocks; + private TestLogsAppender testLogsAppender; + private LoggerContext loggerContext; private List generateNodes() { List nodes = new ArrayList<>(); @@ -490,6 +500,108 @@ public void testDebugLogging() throws IllegalAccessException { } } + public void testConnectionCheckerRetriesIfPendingDisconnection() throws InterruptedException { + final Settings.Builder settings = Settings.builder(); + final long reconnectIntervalMillis = 50; + settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), reconnectIntervalMillis + "ms"); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + builder().put(NODE_NAME_SETTING.getKey(), "node").build(), + random() + ); + + MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool()); + TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final TestNodeConnectionsService service = new TestNodeConnectionsService( + settings.build(), + deterministicTaskQueue.getThreadPool(), + transportService + ); + service.start(); + + // setup the connections + final DiscoveryNode node = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + + final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build(); + + final AtomicBoolean connectionCompleted = new AtomicBoolean(); + service.connectToNodes(nodes, () -> connectionCompleted.set(true)); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(connectionCompleted.get()); + + // reset any logs as we want to assert for exceptions that show up after this + // reset connect to node count to assert for later + logger.info("--> resetting captured logs and counters"); + testLogsAppender.clearCapturedLogs(); + // this ensures we only track connection attempts that happen after the disconnection + transportService.resetConnectToNodeCallCount(); + + // block connection checker reconnection attempts until after we set pending disconnections + logger.info("--> disabling connection checker, and triggering disconnect"); + service.setShouldReconnect(false); + transportService.disconnectFromNode(node); + + // set pending disconnections to true to fail future reconnection attempts + final long maxDisconnectionTime = 1000; + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + logger.info("--> setting pending disconnections to fail next connection attempts"); + service.setPendingDisconnections(new HashSet<>(Collections.singleton(node))); + } + + @Override + public String toString() { + return "scheduled disconnection of " + node; + } + }); + // our task queue will have the first task as the runnable to set pending disconnections + // here we re-enable the connection checker to enqueue next tasks for attempting reconnection + logger.info("--> re-enabling reconnection checker"); + service.setShouldReconnect(true); + + final long maxReconnectionTime = 2000; + final int expectedReconnectionAttempts = 10; + + // this will first run the task to set the pending disconnections, then will execute the reconnection tasks + // exit early when we have enough reconnection attempts + logger.info("--> running tasks in order until expected reconnection attempts"); + runTasksInOrderUntilExpectedReconnectionAttempts( + deterministicTaskQueue, + maxDisconnectionTime + maxReconnectionTime, + transportService, + expectedReconnectionAttempts + ); + logger.info("--> verifying that connectionchecker tried to reconnect"); + + // assert that the connections failed + assertFalse("connected to " + node, transportService.nodeConnected(node)); + + // assert that we saw at least the required number of reconnection attempts, and the exceptions that showed up are as expected + logger.info("--> number of reconnection attempts: {}", transportService.getConnectToNodeCallCount()); + assertThat( + "Did not see enough reconnection attempts from connection checker", + transportService.getConnectToNodeCallCount(), + greaterThan(expectedReconnectionAttempts) + ); + boolean logFound = testLogsAppender.waitForLog("failed to connect", 1, TimeUnit.SECONDS) + && testLogsAppender.waitForLog( + "IllegalStateException: cannot make a new connection as disconnect to node", + 1, + TimeUnit.SECONDS + ); + assertTrue("Expected log for reconnection failure was not found in the required time period", logFound); + + // clear the pending disconnections and ensure the connection gets re-established automatically by connectionchecker + logger.info("--> clearing pending disconnections to allow connections to re-establish"); + service.clearPendingDisconnections(); + runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + maxReconnectionTime + 2 * reconnectIntervalMillis); + assertConnectedExactlyToNodes(transportService, nodes); + } + private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) { while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) { if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { @@ -501,6 +613,24 @@ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long e deterministicTaskQueue.runAllRunnableTasks(); } + private void runTasksInOrderUntilExpectedReconnectionAttempts( + DeterministicTaskQueue deterministicTaskQueue, + long endTimeMillis, + TestTransportService transportService, + int expectedReconnectionAttempts + ) { + // break the loop if we timeout or if we have enough reconnection attempts + while ((deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) + && (transportService.getConnectToNodeCallCount() <= expectedReconnectionAttempts)) { + if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { + deterministicTaskQueue.runNextTask(); + } else if (deterministicTaskQueue.hasDeferredTasks()) { + deterministicTaskQueue.advanceTime(); + } + } + deterministicTaskQueue.runAllRunnableTasksInEnqueuedOrder(); + } + private void ensureConnections(NodeConnectionsService service) { final PlainActionFuture future = new PlainActionFuture<>(); service.ensureConnections(() -> future.onResponse(null)); @@ -526,6 +656,16 @@ private void assertConnected(TransportService transportService, Iterable messagesToCapture = Arrays.asList("failed to connect", "IllegalStateException"); + testLogsAppender = new TestLogsAppender(messagesToCapture); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName()); + loggerConfig.addAppender(testLogsAppender, null, null); + loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.addAppender(testLogsAppender, null, null); + loggerContext.updateLoggers(); ThreadPool threadPool = new TestThreadPool(getClass().getName()); this.threadPool = threadPool; nodeConnectionBlocks = newConcurrentMap(); @@ -537,6 +677,14 @@ public void setUp() throws Exception { @Override @After public void tearDown() throws Exception { + testLogsAppender.clearCapturedLogs(); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); + loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); + loggerContext.updateLoggers(); transportService.stop(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); threadPool = null; @@ -545,6 +693,8 @@ public void tearDown() throws Exception { private final class TestTransportService extends TransportService { + private final AtomicInteger connectToNodeCallCount = new AtomicInteger(0); + private TestTransportService(Transport transport, ThreadPool threadPool) { super( Settings.EMPTY, @@ -588,6 +738,47 @@ public void connectToNode(DiscoveryNode node, ActionListener listener) thr } else { super.connectToNode(node, listener); } + logger.info("calling connectToNode"); + connectToNodeCallCount.incrementAndGet(); + } + + public int getConnectToNodeCallCount() { + return connectToNodeCallCount.get(); + } + + public void resetConnectToNodeCallCount() { + connectToNodeCallCount.set(0); + } + } + + private class TestNodeConnectionsService extends NodeConnectionsService { + private boolean shouldReconnect = true; + + public TestNodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { + super(settings, threadPool, transportService); + } + + public void setShouldReconnect(boolean shouldReconnect) { + this.shouldReconnect = shouldReconnect; + } + + @Override + protected void doStart() { + final StoppableConnectionChecker connectionChecker = new StoppableConnectionChecker(); + this.connectionChecker = connectionChecker; + connectionChecker.scheduleNextCheck(); + } + + class StoppableConnectionChecker extends NodeConnectionsService.ConnectionChecker { + @Override + protected void doRun() { + if (connectionChecker == this && shouldReconnect) { + connectDisconnectedTargets(this::scheduleNextCheck); + } else { + // Skip reconnection attempt but still schedule the next check + scheduleNextCheck(); + } + } } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 350c6f9ae8f6b..440227436175d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1923,11 +1923,6 @@ private final class TestClusterNode { protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool); } - - @Override - protected void connectToNodesAndWait(ClusterState newClusterState) { - // don't do anything, and don't block - } } ); recoverySettings = new RecoverySettings(settings, clusterSettings); @@ -2094,7 +2089,7 @@ public void onFailure(final Exception e) { rerouteService, threadPool ); - nodeConnectionsService = new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService); + nodeConnectionsService = createTestNodeConnectionsService(clusterService.getSettings(), threadPool, transportService); final MetadataMappingService metadataMappingService = new MetadataMappingService(clusterService, indicesService); indicesClusterStateService = new IndicesClusterStateService( settings, @@ -2492,6 +2487,24 @@ protected void assertSnapshotOrGenericThread() { } } + public NodeConnectionsService createTestNodeConnectionsService( + Settings settings, + ThreadPool threadPool, + TransportService transportService + ) { + return new NodeConnectionsService(settings, threadPool, transportService) { + @Override + public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { + // just update targetsByNode to ensure disconnect runs for these nodes + // we rely on disconnect to run for keeping track of pendingDisconnects and ensuring node-joins can happen + for (final DiscoveryNode discoveryNode : discoveryNodes) { + this.targetsByNode.put(discoveryNode, createConnectionTarget(discoveryNode)); + } + onCompletion.run(); + } + }; + } + public ClusterInfoService getMockClusterInfoService() { return clusterInfoService; } @@ -2563,10 +2576,11 @@ public void start(ClusterState initialState) { new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), null ); + coordinator.setNodeConnectionsService(nodeConnectionsService); clusterManagerService.setClusterStatePublisher(coordinator); - coordinator.start(); clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService); nodeConnectionsService.start(); + coordinator.start(); clusterService.start(); indicesService.start(); indicesClusterStateService.start(); diff --git a/server/src/test/java/org/opensearch/test/NoopDiscovery.java b/server/src/test/java/org/opensearch/test/NoopDiscovery.java index 42d3f1887ab4d..c35503a556db6 100644 --- a/server/src/test/java/org/opensearch/test/NoopDiscovery.java +++ b/server/src/test/java/org/opensearch/test/NoopDiscovery.java @@ -32,6 +32,7 @@ package org.opensearch.test; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.core.action.ActionListener; @@ -55,6 +56,11 @@ public void startInitialJoin() { } + @Override + public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { + + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/opensearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/opensearch/transport/ClusterConnectionManagerTests.java index 1d734a56ef189..fdf762aa096f0 100644 --- a/server/src/test/java/org/opensearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/opensearch/transport/ClusterConnectionManagerTests.java @@ -320,6 +320,50 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti assertEquals(0, nodeDisconnectedCount.get()); } + public void testConnectFailsWhenDisconnectIsPending() { + AtomicInteger nodeConnectedCount = new AtomicInteger(); + AtomicInteger nodeDisconnectedCount = new AtomicInteger(); + connectionManager.addListener(new TransportConnectionListener() { + @Override + public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { + nodeConnectedCount.incrementAndGet(); + } + + @Override + public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + nodeDisconnectedCount.incrementAndGet(); + } + }); + + DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT); + ConnectionManager.ConnectionValidator validator = (c, p, l) -> l.onResponse(null); + Transport.Connection connection = new TestConnect(node); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(connection); + return null; + }).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class)); + assertFalse(connectionManager.nodeConnected(node)); + + // Mark connection as pending disconnect, any connection attempt should fail + connectionManager.setPendingDisconnection(node); + PlainActionFuture fut = new PlainActionFuture<>(); + connectionManager.connectToNode(node, connectionProfile, validator, fut); + expectThrows(IllegalStateException.class, () -> fut.actionGet()); + + // clear the pending disconnect and assert that connection succeeds + connectionManager.clearPendingDisconnections(); + assertFalse(connectionManager.nodeConnected(node)); + PlainActionFuture.get( + future -> connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.map(future, x -> null)) + ); + assertFalse(connection.isClosed()); + assertTrue(connectionManager.nodeConnected(node)); + assertEquals(1, connectionManager.size()); + assertEquals(1, nodeConnectedCount.get()); + assertEquals(0, nodeDisconnectedCount.get()); + } + private static class TestConnect extends CloseableConnection { private final DiscoveryNode node; diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index b432e5411404e..3efcc538a1b25 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -55,6 +55,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; @@ -1150,9 +1151,12 @@ protected Optional getDisruptableMockTransport(Transpo new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); - clusterService.setNodeConnectionsService( - new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) + NodeConnectionsService nodeConnectionsService = createTestNodeConnectionsService( + clusterService.getSettings(), + threadPool, + transportService ); + clusterService.setNodeConnectionsService(nodeConnectionsService); repositoriesService = new RepositoriesService( settings, clusterService, @@ -1187,6 +1191,7 @@ protected Optional getDisruptableMockTransport(Transpo new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), null ); + coordinator.setNodeConnectionsService(nodeConnectionsService); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( settings, @@ -1588,6 +1593,24 @@ public void onNodeAck(DiscoveryNode node, Exception e) { } } + public static NodeConnectionsService createTestNodeConnectionsService( + Settings settings, + ThreadPool threadPool, + TransportService transportService + ) { + return new NodeConnectionsService(settings, threadPool, transportService) { + @Override + public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { + // just update targetsByNode to ensure disconnect runs for these nodes + // we rely on disconnect to run for keeping track of pendingDisconnects and ensuring node-joins can happen + for (final DiscoveryNode discoveryNode : discoveryNodes) { + this.targetsByNode.put(discoveryNode, createConnectionTarget(discoveryNode)); + } + onCompletion.run(); + } + }; + } + static class DisruptableClusterApplierService extends ClusterApplierService { private final String nodeName; private final DeterministicTaskQueue deterministicTaskQueue; @@ -1641,11 +1664,6 @@ public void onNewClusterState(String source, Supplier clusterState } } - @Override - protected void connectToNodesAndWait(ClusterState newClusterState) { - // don't do anything, and don't block - } - @Override protected boolean applicationMayFail() { return this.applicationMayFail; diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/DeterministicTaskQueue.java index 1ad18bf89d5ba..4f692c7bc8f62 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/DeterministicTaskQueue.java @@ -92,6 +92,12 @@ public void runAllRunnableTasks() { } } + public void runAllRunnableTasksInEnqueuedOrder() { + while (hasRunnableTasks()) { + runTask(0); + } + } + public void runAllTasks() { while (hasDeferredTasks() || hasRunnableTasks()) { if (hasDeferredTasks() && random.nextBoolean()) { @@ -141,6 +147,11 @@ public void runRandomTask() { runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1)); } + public void runNextTask() { + assert hasRunnableTasks(); + runTask(0); + } + private void runTask(final int index) { final Runnable task = runnableTasks.remove(index); logger.trace("running task {} of {}: {}", index, runnableTasks.size() + 1, task); diff --git a/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java b/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java new file mode 100644 index 0000000000000..030f399a5bcc0 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test; + +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.layout.PatternLayout; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test logs appender that provides functionality to extract specific logs/exception messages and wait for it to show up + * @opensearch.internal + */ +public class TestLogsAppender extends AbstractAppender { + private final List capturedLogs = new ArrayList<>(); + private final List messagesToCapture; + + public TestLogsAppender(List messagesToCapture) { + super("TestAppender", null, PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY); + this.messagesToCapture = messagesToCapture; + start(); + } + + @Override + public void append(LogEvent event) { + if (shouldCaptureMessage(event.getMessage().getFormattedMessage())) capturedLogs.add(event.getMessage().getFormattedMessage()); + if (event.getThrown() != null) { + if (shouldCaptureMessage(event.getThrown().toString())) capturedLogs.add(event.getThrown().toString()); + for (StackTraceElement element : event.getThrown().getStackTrace()) + if (shouldCaptureMessage(element.toString())) capturedLogs.add(element.toString()); + } + } + + public boolean shouldCaptureMessage(String log) { + return messagesToCapture.stream().anyMatch(log::contains); + } + + public List getCapturedLogs() { + return new ArrayList<>(capturedLogs); + } + + public boolean waitForLog(String expectedLog, long timeout, TimeUnit unit) { + long startTime = System.currentTimeMillis(); + long timeoutInMillis = unit.toMillis(timeout); + + while (System.currentTimeMillis() - startTime < timeoutInMillis) { + if (capturedLogs.stream().anyMatch(log -> log.contains(expectedLog))) { + return true; + } + try { + Thread.sleep(100); // Wait for 100ms before checking again + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + return false; + } + + // Clear captured logs + public void clearCapturedLogs() { + capturedLogs.clear(); + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java index 37df90fb103a3..d1e1a3e8af17c 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java @@ -123,6 +123,16 @@ public void disconnectFromNode(DiscoveryNode node) { delegate.disconnectFromNode(node); } + @Override + public void setPendingDisconnection(DiscoveryNode node) { + delegate.setPendingDisconnection(node); + } + + @Override + public void clearPendingDisconnections() { + delegate.clearPendingDisconnections(); + } + @Override public int size() { return delegate.size();