Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 5, 2024
1 parent 36e473d commit 9c788fb
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@
package org.opensearch.cluster.coordination;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.MockEngineFactoryPlugin;
Expand All @@ -51,9 +48,7 @@
import org.opensearch.test.store.MockFSIndexStore;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportConnectionListener;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -104,22 +99,14 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.build();
// start a cluster-manager node
// start a 3 node cluster with 1 cluster-manager
final String cm = internalCluster().startNode(nodeSettings);

logger.info("--> spawning node t1");
final String blueNodeName = internalCluster().startNode(
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
);
logger.info("--> spawning node t2");
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());

logger.info("--> initial health check");
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
logger.info("--> done initial health check");

logger.info("--> creating index");
client().admin()
.indices()
.prepareCreate(indexName)
Expand All @@ -130,63 +117,22 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get();
logger.info("--> done creating index");
MockTransportService cmTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
redNodeName
);

ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, cm);
// simulate a slow applier on the cm
cmClsService.addStateApplier(new ClusterStateApplier() {
@Override
public void applyClusterState(ClusterChangedEvent event) {
if (event.nodesRemoved()) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Simulate a slow applier on the cm to delay node-left state application
cmClsService.addStateApplier(event -> {
if (event.nodesRemoved()) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
cmTransportService.connectionManager().addListener(new TransportConnectionListener() {

@Override
public void onConnectionOpened(Transport.Connection connection) {
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }

}

@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
// if (node.getName().equals("node_t2")) {
// try {
// Thread.sleep(250);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
}

// @Override
// public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
});
AtomicBoolean bb = new AtomicBoolean();
// simulate followerchecker failure

ConnectionDelay handlingBehavior = new ConnectionDelay(FOLLOWER_CHECK_ACTION_NAME, () -> {
// Simulate followerchecker failure on 1 node when bb is false
ConnectionDelay handlingBehavior = new ConnectionDelay(() -> {
if (bb.get()) {
return;
}
Expand All @@ -197,55 +143,39 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
}
throw new NodeHealthCheckFailureException("non writable exception");
});
MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
redNodeName
);
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, handlingBehavior);

// Loop runs 10 times to ensure race condition gets reproduced
for (int i = 0; i < 10; i++) {
bb.set(false); // fail followerchecker by force to trigger node disconnect
logger.info("--> disconnecting from red node, iteration: " + i);
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
bb.set(false);
// fail followerchecker by force to trigger node disconnect
// now followerchecker should fail and trigger node left
logger.info("--> checking cluster health 2 nodes, iteration: " + i);
ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
assertThat(response1.isTimedOut(), is(false));
logger.info("--> completed checking cluster health 2 nodes, iteration: " + i);

// once we know a node has left, we can re-enable followerchecker to work normally
bb.set(true);
Thread.sleep(1500);
logger.info("--> checking cluster health 3 nodes, iteration: " + i);
ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response2.isTimedOut(), is(false));
logger.info("--> completed checking cluster health 3 nodes, iteration: " + i);

Thread.sleep(1500);

// Checking again
logger.info("--> checking cluster health 3 nodes again, iteration: " + i);
// Checking again to validate stability
ClusterHealthResponse response3 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response3.isTimedOut(), is(false));
logger.info("--> completed checking cluster health 3 nodes again, iteration: " + i);
}

bb.set(true);
logger.info("-->first validation outside loop");
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));

logger.info("-->sleeping for 20s");
Thread.sleep(20000);

logger.info("-->second validation outside loop after sleep");
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));
}

private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {

private final String actionName;
private final Runnable connectionBreaker;

private ConnectionDelay(String actionName, Runnable connectionBreaker) {
this.actionName = actionName;
private ConnectionDelay(Runnable connectionBreaker) {
this.connectionBreaker = connectionBreaker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
for (final DiscoveryNode discoveryNode : discoveryNodes) {
nodesToDisconnect.remove(discoveryNode);
}
logger.info(" targetsByNode is {}", targetsByNode.keySet());
logger.info(" nodes to disconnect set is [{}]", nodesToDisconnect);

for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
runnables.add(targetsByNode.get(discoveryNode).disconnect());
Expand All @@ -181,7 +179,7 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
// There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore
// This code block clears the pending disconnect for these nodes to avoid permanently blocking node joins
// This situation should ideally not happen
transportService.markDisconnectAsCompleted(
transportService.removePendingDisconnections(
transportService.getPendingDisconnections()
.stream()
.filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
} else if (joinHelper.isJoinPending()) {
logger.trace("onFollowerCheckRequest: rejoining cluster-manager, responding successfully to {}", followerCheckRequest);
} else {
logger.info("Mode: {}, ", mode);
logger.trace("onFollowerCheckRequest: received check from faulty cluster-manager, rejecting {}", followerCheckRequest);
throw new CoordinationStateRejectedException(
"onFollowerCheckRequest: received check from faulty cluster-manager, rejecting " + followerCheckRequest
Expand Down Expand Up @@ -1361,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
// marking pending disconnects before publish
// if we try to joinRequest during pending disconnect, it should fail
transportService.markPendingDisconnects(clusterChangedEvent.nodesDelta());
transportService.setPendingDisconnections(clusterChangedEvent.nodesDelta());
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
Expand Down Expand Up @@ -1466,7 +1465,6 @@ private class CoordinatorPeerFinder extends PeerFinder {
protected void onActiveClusterManagerFound(DiscoveryNode clusterManagerNode, long term) {
synchronized (mutex) {
ensureTermAtLeast(clusterManagerNode, term);
logger.info("sending join request to {}", clusterManagerNode);
joinHelper.sendJoinRequest(clusterManagerNode, getCurrentTerm(), joinWithDestination(lastJoin, clusterManagerNode, term));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS
}

public void start(Set<DiscoveryNode> faultyNodes) {
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
logger.info("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
logger.debug("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);

for (final DiscoveryNode faultyNode : faultyNodes) {
onFaultyNode(faultyNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ private void handleException(String summary, long startTimeMillis, ClusterState
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
logger.debug("in cluster compute, finished computing new cluster state for version: {}", newClusterState.getVersion());
return new TaskOutputs(
taskInputs,
previousClusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -118,23 +117,6 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
internalOpenConnection(node, resolvedProfile, listener);
}

@Override
public void markPendingDisconnects(List<DiscoveryNode> nodes) {
logger.info("marking pending disconnects for nodes: [{}]", nodes);
pendingDisconnections.addAll(nodes);
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return pendingDisconnections;
}

@Override
public void markDisconnectAsCompleted(Set<DiscoveryNode> nodes) {
logger.debug("marking disconnect as completed for nodes: [{}]", nodes);
pendingDisconnections.removeAll(nodes);
}

/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
Expand All @@ -147,7 +129,7 @@ public void connectToNode(
ConnectionValidator connectionValidator,
ActionListener<Void> listener
) throws ConnectTransportException {
logger.info("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
logger.trace("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
Expand Down Expand Up @@ -195,16 +177,16 @@ public void connectToNode(
assert Transports.assertNotTransportThread("connection validator success");
try {
if (connectedNodes.putIfAbsent(node, conn) != null) {
logger.info("existing connection to node [{}], closing new redundant connection", node);
logger.debug("existing connection to node [{}], closing new redundant connection", node);
IOUtils.closeWhileHandlingException(conn);
} else {
logger.info("connected to node [{}]", node);
logger.debug("connected to node [{}]", node);
try {
connectionListener.onNodeConnected(node, conn);
} finally {
final Transport.Connection finalConnection = conn;
conn.addCloseListener(ActionListener.wrap(() -> {
logger.info("unregistering {} after connection close and marking as disconnected", node);
logger.trace("unregistering {} after connection close and marking as disconnected", node);
connectedNodes.remove(node, finalConnection);
connectionListener.onNodeDisconnected(node, conn);
}));
Expand Down Expand Up @@ -263,7 +245,24 @@ public void disconnectFromNode(DiscoveryNode node) {
nodeChannels.close();
}
pendingDisconnections.remove(node);
logger.info("Removed node {} from pending disconnects list", node);
logger.debug("Removed node [{}] from pending disconnections list", node);
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return pendingDisconnections;
}

@Override
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
logger.debug("set pending disconnection for nodes: [{}]", nodes);
pendingDisconnections.addAll(nodes);
}

@Override
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
logger.debug("marking disconnection as completed for nodes: [{}]", nodes);
pendingDisconnections.removeAll(nodes);
}

/**
Expand Down Expand Up @@ -317,7 +316,6 @@ private void internalOpenConnection(
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
}
if (connection.isClosed()) {
logger.info("channel closed while connecting, throwing exception");
throw new ConnectTransportException(node, "a channel closed while connecting");
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.core.action.ActionListener;

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -53,10 +52,6 @@ public interface ConnectionManager extends Closeable {

void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener);

Set<DiscoveryNode> getPendingDisconnections();

void markDisconnectAsCompleted(Set<DiscoveryNode> nodes);

void connectToNode(
DiscoveryNode node,
ConnectionProfile connectionProfile,
Expand All @@ -68,10 +63,14 @@ void connectToNode(

boolean nodeConnected(DiscoveryNode node);

void markPendingDisconnects(List<DiscoveryNode> nodes);

void disconnectFromNode(DiscoveryNode node);

Set<DiscoveryNode> getPendingDisconnections();

void setPendingDisconnections(Set<DiscoveryNode> nodes);

void removePendingDisconnections(Set<DiscoveryNode> nodes);

Set<DiscoveryNode> getAllConnectedNodes();

int size();
Expand Down
Loading

0 comments on commit 9c788fb

Please sign in to comment.