Skip to content

Commit

Permalink
Merge pull request #23739 from vespa-engine/revert-23703-hmusum/clean…
Browse files Browse the repository at this point in the history
…up-15

Revert "Cluster controller unit test cleanup, part 4 [run-systemtest]"
  • Loading branch information
Harald Musum authored Aug 22, 2022
2 parents 9fb7e02 + 402d4e3 commit 7329258
Show file tree
Hide file tree
Showing 16 changed files with 220 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.apps.clustercontroller;

import com.yahoo.jdisc.Metric;
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Doesn't really test cluster controller, but runs some lines of code.
* System tests verifies that container can load it..
*/
public class ClusterControllerTest {

private FleetControllerOptions options = new FleetControllerOptions("storage", Set.of(new ConfiguredNode(0, false)));

private final Metric metric = new Metric() {
@Override
public void set(String s, Number number, Context context) {}
@Override
public void add(String s, Number number, Context context) {}
@Override
public Context createContext(Map<String, ?> stringMap) { return null; }
};

@BeforeEach
public void setUp() {
options = new FleetControllerOptions("storage", Set.of(new ConfiguredNode(0, false)));
options.zooKeeperServerAddress = null;
options.slobrokConfigId = "raw:";
options.slobrokConnectionSpecs = null;
}

@Test
void testSimple() throws Exception {
ClusterController cc = new ClusterController();
cc.setOptions(options, metric);
cc.setOptions(options, metric);
assertEquals(1, cc.getFleetControllers().size());
assertNotNull(cc.get("storage"));
assertNull(cc.get("music"));
cc.countdown();
assertTrue(cc.getController("storage").isRunning());
cc.countdown();
assertFalse(cc.getController("storage").isRunning());
}

@Test
void testShutdownException() throws Exception {
ClusterController cc = new ClusterController() {
void shutdownController(FleetController controller) throws Exception {
throw new Exception("Foo");
}
};
cc.setOptions(options, metric);
cc.countdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static class Params {
Params() {
}

// FIXME de-dupe
static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTimeMs, int storageTransitionTimeMs) {
Map<com.yahoo.vdslib.state.NodeType, java.lang.Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.DISTRIBUTOR, distributorTransitionTimeMs);
Expand All @@ -64,7 +65,7 @@ Params transitionTimes(Map<NodeType, Integer> timesMs) {
this.transitionTimes = timesMs;
return this;
}
Params currentTimeInMillis(long currentTimeMs) {
Params currentTimeInMilllis(long currentTimeMs) {
this.currentTimeInMillis = currentTimeMs;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ private void scheduleVersionDependentTasksForFutureCompletion(int completeAtVers

private AnnotatedClusterState computeCurrentAnnotatedState() {
ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options);
params.currentTimeInMillis(timer.getCurrentTimeInMillis())
params.currentTimeInMilllis(timer.getCurrentTimeInMillis())
.cluster(cluster)
.lowestObservedDistributionBitCount(stateVersionTracker.getLowestObservedDistributionBits());
return ClusterStateGenerator.generatedStateFrom(params);
Expand Down Expand Up @@ -1200,7 +1200,7 @@ public void waitForNodesInSlobrok(int distNodeCount, int storNodeCount, Duration
while (true) {
int distCount = 0, storCount = 0;
for (NodeInfo info : cluster.getNodeInfos()) {
if (info.isInSlobrok()) {
if (!info.isRpcAddressOutdated()) {
if (info.isDistributor()) ++distCount;
else ++storCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.yahoo.collections.Pair;
import com.yahoo.jrt.Target;
import java.util.logging.Level;
import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.distribution.Group;
import com.yahoo.vdslib.state.ClusterState;
Expand All @@ -12,12 +13,12 @@
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -243,13 +244,11 @@ public int getNodeIndex() {

public ContentCluster getCluster() { return cluster; }

/** Returns true if the node is registered in slobrok */
public boolean isInSlobrok() { return lastSeenInSlobrok == null; }

/** Returns true if the node is NOT registered in slobrok */
public boolean isNotInSlobrok() { return ! isInSlobrok(); }
/** Returns true if the node is currently registered in slobrok */
// FIXME why is this called "isRpcAddressOutdated" then???
public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; }

public Long lastSeenInSlobrok() { return lastSeenInSlobrok; }
public Long getRpcAddressOutdatedTimestamp() { return lastSeenInSlobrok; }

public void abortCurrentNodeStateRequests() {
for(Pair<GetNodeStateRequest, Long> it : pendingNodeStateRequests) {
Expand All @@ -276,7 +275,7 @@ public NodeState getWantedState() {
return wantedState;
}

/** Returns the wanted state set directly by a user (i.e. not configured) */
/** Returns the wanted state set directly by a user (i.e not configured) */
public NodeState getUserWantedState() { return wantedState; }

public long getTimeOfFirstFailingConnectionAttempt() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

import com.yahoo.jrt.ErrorCode;
import com.yahoo.jrt.Target;
import java.util.logging.Level;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeListener;

import java.util.LinkedList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -62,10 +63,10 @@ public boolean sendMessages(ContentCluster cluster, Communicator communicator, N
if (requestTime != null && (currentTime - requestTime < nodeStateRequestTimeoutMS)) continue; // pending request
if (info.getTimeForNextStateRequestAttempt() > currentTime) continue; // too early

if (info.getRpcAddress() == null || info.isNotInSlobrok()) { // Cannot query state of node without RPC address or not in slobrok
if (info.getRpcAddress() == null || info.isRpcAddressOutdated()) { // Cannot query state of node without RPC address
log.log(Level.FINE, () -> "Not sending getNodeState request to node " + info.getNode() + ": Not in slobrok");
NodeState reportedState = info.getReportedState().clone();
if (( ! reportedState.getState().equals(State.DOWN) && currentTime - info.lastSeenInSlobrok() > maxSlobrokDisconnectGracePeriod)
if (( ! reportedState.getState().equals(State.DOWN) && currentTime - info.getRpcAddressOutdatedTimestamp() > maxSlobrokDisconnectGracePeriod)
|| reportedState.getState().equals(State.STOPPING)) // Don't wait for grace period if we expect node to be stopping
{
log.log(Level.FINE, () -> "Setting reported state to DOWN "
Expand All @@ -74,8 +75,8 @@ public boolean sendMessages(ContentCluster cluster, Communicator communicator, N
: "as node has been out of slobrok longer than " + maxSlobrokDisconnectGracePeriod + "."));
if (reportedState.getState().oneOf("iur") || ! reportedState.hasDescription()) {
StringBuilder sb = new StringBuilder().append("Set node down as it has been out of slobrok for ")
.append(currentTime - info.lastSeenInSlobrok()).append(" ms which is more than the max limit of ")
.append(maxSlobrokDisconnectGracePeriod).append(" ms.");
.append(currentTime - info.getRpcAddressOutdatedTimestamp()).append(" ms which is more than the max limit of ")
.append(maxSlobrokDisconnectGracePeriod).append(" ms.");
reportedState.setDescription(sb.toString());
}
reportedState.setState(State.DOWN);
Expand Down Expand Up @@ -180,7 +181,7 @@ private NodeState handleError(GetNodeStateRequest req, NodeInfo info, long curre
newState.setState(State.DOWN);
} else if (msg.equals("jrt: Connection closed by peer") || msg.equals("Connection reset by peer")) {
msg = "Connection error: Closed at other end. (Node or switch likely shut down)";
if (info.isNotInSlobrok()) {
if (info.isRpcAddressOutdated()) {
msg += " Node is no longer in slobrok.";
}
if (info.getReportedState().getState().oneOf("ui")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private boolean nodeStillUnavailableAfterTransitionTimeExceeded(
{
return currentStateInSystem.getState().equals(State.MAINTENANCE)
&& node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN))
&& (lastReportedState.getState().equals(State.DOWN) || node.isNotInSlobrok())
&& (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated())
&& node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime;
}

Expand All @@ -339,14 +339,14 @@ private boolean reportDownIfOutdatedSlobrokNode(ClusterState currentClusterState
NodeInfo node,
NodeState lastReportedState)
{
if (node.isNotInSlobrok()
if (node.isRpcAddressOutdated()
&& !lastReportedState.getState().equals(State.DOWN)
&& node.lastSeenInSlobrok() + maxSlobrokDisconnectGracePeriod <= currentTime)
&& node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime)
{
final String desc = String.format(
"Set node down as it has been out of slobrok for %d ms which " +
"is more than the max limit of %d ms.",
currentTime - node.lastSeenInSlobrok(),
currentTime - node.getRpcAddressOutdatedTimestamp(),
maxSlobrokDisconnectGracePeriod);
node.abortCurrentNodeStateRequests();
NodeState state = lastReportedState.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void processSetClusterStateResponses() {
}

private static boolean nodeIsReachable(NodeInfo node) {
if (node.getRpcAddress() == null || node.isNotInSlobrok()) {
if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) {
return false; // Can't set state on nodes we don't know where are
}
if (node.getReportedState().getState() == State.MAINTENANCE ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public boolean handleRpcRequests(ContentCluster cluster, ClusterState systemStat
if (!e.getMessage().equals(lastConnectError) || time - lastConnectErrorTime > 60 * 1000) {
lastConnectError = e.getMessage();
lastConnectErrorTime = time;
log.log(Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage());
log.log(Level.WARNING, "Failed to initailize RPC server socket: " + e.getMessage());
}
}
}
Expand Down Expand Up @@ -227,17 +227,17 @@ public boolean handleRpcRequests(ContentCluster cluster, ClusterState systemStat
}
if (req.methodName().equals("getNodeList")) {
log.log(Level.FINE, "Resolving RPC getNodeList request");
List<String> slobrok = new ArrayList<>();
List<String> rpc = new ArrayList<>();
List<String> slobrok = new ArrayList<String>();
List<String> rpc = new ArrayList<String>();
for(NodeInfo node : cluster.getNodeInfos()) {
String s1 = node.getSlobrokAddress();
String s2 = node.getRpcAddress();
assert(s1 != null);
slobrok.add(s1);
rpc.add(s2 == null ? "" : s2);
}
req.returnValues().add(new StringArray(slobrok.toArray(new String[0])));
req.returnValues().add(new StringArray(rpc.toArray(new String[0])));
req.returnValues().add(new StringArray(slobrok.toArray(new String[slobrok.size()])));
req.returnValues().add(new StringArray(rpc.toArray(new String[rpc.size()])));
req.returnRequest();
} else if (req.methodName().equals("getSystemState")) {
log.log(Level.FINE, "Resolving RPC getSystemState request");
Expand Down Expand Up @@ -280,7 +280,7 @@ public boolean handleRpcRequests(ContentCluster cluster, ClusterState systemStat
NodeState oldState = node.getUserWantedState();
String message = (nodeState.getState().equals(State.UP)
? "Clearing wanted nodeState for node " + node
: "New wantedstate '" + nodeState + "' stored for node " + node);
: "New wantedstate '" + nodeState.toString() + "' stored for node " + node);
if (!oldState.equals(nodeState) || !oldState.getDescription().equals(nodeState.getDescription())) {
if (!nodeState.getState().validWantedNodeState(nodeType)) {
throw new IllegalStateException("State " + nodeState.getState()
Expand All @@ -289,7 +289,7 @@ public boolean handleRpcRequests(ContentCluster cluster, ClusterState systemStat
node.setWantedState(nodeState);
changeListener.handleNewWantedNodeState(node, nodeState);
} else {
message = "Node " + node + " already had wanted state " + nodeState;
message = "Node " + node + " already had wanted state " + nodeState.toString();
log.log(Level.FINE, message);
}
req.returnValues().add(new StringValue(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public boolean updateCluster(ContentCluster cluster, SlobrokListener listener) {
}
cluster.setSlobrokGenerationCount(mirrorVersion);
for (NodeInfo nodeInfo : cluster.getNodeInfos()) {
if (slobrokNodes.containsKey(nodeInfo.getNode()) && nodeInfo.isNotInSlobrok()) {
if (slobrokNodes.containsKey(nodeInfo.getNode()) && nodeInfo.isRpcAddressOutdated()) {
context.log(log,
Level.WARNING,
"Node " + nodeInfo
Expand Down Expand Up @@ -187,15 +187,15 @@ private void detectNewAndMissingNodes(
newNext = null;
} else if (newNext == null || newNext.node.compareTo(oldNext.getNode()) > 0) {
assert(slobrokNodes.get(oldNext.getNode()) == null);
if (oldNext.isInSlobrok() && oldNext.getRpcAddress() != null) {
if (!oldNext.isRpcAddressOutdated() && oldNext.getRpcAddress() != null) {
missingNodeInfos.add(oldNext);
}
oldNext = null;
} else {
assert(newNext.rpcAddress != null);
if (oldNext.getRpcAddress() == null || !oldNext.getRpcAddress().equals(newNext.rpcAddress)) {
alteredRpcAddress.add(newNext);
} else if (oldNext.isNotInSlobrok()) {
} else if (oldNext.isRpcAddressOutdated()) {
returningRpcAddressNodeInfos.add(oldNext);
}
oldNext = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void addRpcAddress(NodeInfo nodeInfo, HtmlTable.Row row) {
row.addCell(new HtmlTable.Cell("-").addProperties(ERROR_PROPERTY));
} else {
row.addCell(new HtmlTable.Cell(HtmlTable.escape(nodeInfo.getRpcAddress())));
if (nodeInfo.isNotInSlobrok()) {
if (nodeInfo.isRpcAddressOutdated()) {
row.getLastCell().addProperties(WARNING_PROPERTY);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeListener;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import static org.mockito.Mockito.mock;

Expand All @@ -21,15 +24,17 @@ public class ClusterFixture {
public final ContentCluster cluster;
public final Distribution distribution;
public final FakeTimer timer;
private final EventLogInterface eventLog;
final StateChangeHandler nodeStateChangeHandler;
private final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params();

public ClusterFixture(ContentCluster cluster, Distribution distribution) {
this.cluster = cluster;
this.distribution = distribution;
this.timer = new FakeTimer();
this.eventLog = mock(EventLogInterface.class);
var context = new FleetControllerContextImpl(new FleetControllerId(cluster.getName(), 0));
this.nodeStateChangeHandler = new StateChangeHandler(context, timer, mock(EventLogInterface.class));
this.nodeStateChangeHandler = new StateChangeHandler(context, timer, eventLog);
this.params.cluster(this.cluster);
}

Expand Down Expand Up @@ -145,6 +150,13 @@ public ClusterFixture assignDummyRpcAddresses() {
return this;
}

static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime);
maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime);
return maxTransitionTime;
}

void disableTransientMaintenanceModeOnDown() {
this.params.transitionTimes(0);
}
Expand All @@ -162,7 +174,7 @@ ClusterFixture markNodeAsConfigRetired(int nodeIndex) {
}

AnnotatedClusterState annotatedGeneratedClusterState() {
params.currentTimeInMillis(timer.getCurrentTimeInMillis());
params.currentTimeInMilllis(timer.getCurrentTimeInMillis());
return ClusterStateGenerator.generatedStateFrom(params);
}

Expand Down
Loading

0 comments on commit 7329258

Please sign in to comment.