Skip to content

Commit

Permalink
Merge pull request #372 from Vlatombe/fail-fast-on-cloud-node-removal
Browse files Browse the repository at this point in the history
Fail fast on cloud node removal
  • Loading branch information
jglick authored May 15, 2024
2 parents 295673a + 04e4192 commit 1891ab0
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 58 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@
<version>1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>mock-slave</artifactId>
<version>153.v9768799a_2294</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
} else {
LOGGER.fine(() -> "rediscovering that " + node + " has been removed and timeout has expired");
listener().getLogger().println(node + " has been removed for " + Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS) + ", assuming it is not coming back");
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());

Check warning on line 370 in src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered line

Line 370 is not covered by tests

Check warning on line 370 in src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());
}
}
removedNodeDiscovered = 0; // something else; reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected Executor tryResolve() throws Exception {
Queue.getInstance().cancel(item);
owner.getListener().getLogger().printf("Killed %s after waiting for %s because we assume unknown agent %s is never going to appear%n",
item.task.getDisplayName(), Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS), placeholder.getAssignedLabel());
throw new FlowInterruptedException(Result.ABORTED, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, new ExecutorStepExecution.RemovedNodeTimeoutCause());

Check warning on line 125 in src/main/java/org/jenkinsci/plugins/workflow/support/pickles/ExecutorPickle.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered line

Line 125 is not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.stream.Stream;
import jenkins.model.CauseOfInterruption;
import org.jenkinsci.Symbol;
import org.jenkinsci.plugins.workflow.flow.ErrorCondition;
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException;
Expand All @@ -57,8 +58,7 @@ public final class AgentErrorCondition extends ErrorCondition {
if (t instanceof AgentOfflineException) {
return true;
}
if (t instanceof FlowInterruptedException && ((FlowInterruptedException) t).getCauses().stream().anyMatch(
c -> c instanceof ExecutorStepExecution.RemovedNodeCause || c instanceof ExecutorStepExecution.QueueTaskCancelled)) {
if (t instanceof FlowInterruptedException && ((FlowInterruptedException) t).getCauses().stream().anyMatch(Retryable.class::isInstance)) {
return true;
}
if (isClosedChannelException(t)) {
Expand Down Expand Up @@ -90,6 +90,11 @@ private static boolean isClosedChannelException(Throwable t) {
}
}

/**
* A marker interface for {@link CauseOfInterruption} instances that can be retried through {@link AgentErrorCondition}.
*/
public interface Retryable {}

@Symbol("agent")
@Extension public static final class DescriptorImpl extends ErrorConditionDescriptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void resume(StepContext context) throws Exception {
exec = item.getFuture().getStartCondition().get(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
} catch (TimeoutException x) {
listener.getLogger().println(node + " has been removed for " + Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS) + ", assuming it is not coming back");
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());

Check warning on line 110 in src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());
} catch (CancellationException x) {
LOGGER.log(Level.FINE, "ceased to wait for " + node, x);
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.QueueTaskCancelled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import hudson.model.ResourceList;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.Slave;
import hudson.model.TaskListener;
import hudson.model.TopLevelItem;
import hudson.model.User;
Expand All @@ -38,6 +39,7 @@
import hudson.security.ACLContext;
import hudson.security.AccessControlled;
import hudson.security.Permission;
import hudson.slaves.AbstractCloudSlave;
import hudson.slaves.OfflineCause;
import hudson.slaves.WorkspaceList;
import java.io.IOException;
Expand Down Expand Up @@ -74,6 +76,7 @@
import org.acegisecurity.Authentication;
import org.jenkinsci.plugins.durabletask.executors.ContinuableExecutable;
import org.jenkinsci.plugins.durabletask.executors.ContinuedTask;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;
import org.jenkinsci.plugins.workflow.actions.LabelAction;
import org.jenkinsci.plugins.workflow.actions.QueueItemAction;
import org.jenkinsci.plugins.workflow.actions.ThreadNameAction;
Expand Down Expand Up @@ -334,7 +337,7 @@ public void stop(@NonNull Throwable cause) throws Exception {

}

public static final class QueueTaskCancelled extends CauseOfInterruption {
public static final class QueueTaskCancelled extends RetryableCauseOfInterruption {
@Override public String getShortDescription() {
return Messages.ExecutorStepExecution_queue_task_cancelled();
}
Expand All @@ -346,51 +349,75 @@ public static final class QueueTaskCancelled extends CauseOfInterruption {
return;
}
LOGGER.fine(() -> "received node deletion event on " + node.getNodeName());
Timer.get().schedule(() -> {
Computer c = node.toComputer();
if (c == null || c.isOnline()) {
LOGGER.fine(() -> "computer for " + node.getNodeName() + " was missing or online, skipping");
return;
}
LOGGER.fine(() -> "processing node deletion event on " + node.getNodeName());
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
PlaceholderTask task = ((PlaceholderTask.PlaceholderExecutable) exec).getParent();
TaskListener listener;
try {
listener = task.context.get(TaskListener.class);
} catch (Exception x) {
LOGGER.log(Level.WARNING, null, x);
continue;
}
task.withExecution(execution -> {
BodyExecution body = execution.body;
if (body == null) {
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted, but do not have a node body to cancel");
return;
}
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted; cancelling node body");
if (Util.isOverridden(BodyExecution.class, body.getClass(), "cancel", Throwable.class)) {
body.cancel(new FlowInterruptedException(Result.ABORTED, false, new RemovedNodeCause()));
} else { // TODO remove once https://github.com/jenkinsci/workflow-cps-plugin/pull/570 is widely deployed
body.cancel(new RemovedNodeCause());
}
});
if (isOneShotAgent(node)) {
LOGGER.fine(() -> "Cancelling owner run for one-shot agent " + node.getNodeName() + " immediately");
cancelOwnerExecution(node, new RemovedNodeCause());
} else {
LOGGER.fine(() -> "Will cancel owner run for agent " + node.getNodeName() + " after waiting for " + TIMEOUT_WAITING_FOR_NODE_MILLIS + "ms");
Timer.get().schedule(() -> cancelOwnerExecution(node, new RemovedNodeCause()), TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
}
}

private static boolean isOneShotAgent(Node node) {
return node instanceof AbstractCloudSlave ||

Check warning on line 362 in src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Partially covered line

Line 362 is only partially covered, one branch is missing
(node instanceof Slave && ((Slave) node).getRetentionStrategy() instanceof OnceRetentionStrategy);
}

private static void cancelOwnerExecution(Node node, CauseOfInterruption... causes) {
Computer c = node.toComputer();
if (c == null || c.isOnline()) {
LOGGER.fine(() -> "computer for " + node.getNodeName() + " was missing or online, skipping");
return;
}
LOGGER.fine(() -> "processing node deletion event on " + node.getNodeName());
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
PlaceholderTask task = ((PlaceholderTask.PlaceholderExecutable) exec).getParent();
TaskListener listener;
try {
listener = task.context.get(TaskListener.class);
} catch (Exception x) {
LOGGER.log(Level.WARNING, null, x);
continue;
}
task.withExecution(execution -> {
BodyExecution body = execution.body;
if (body == null) {
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted, but do not have a node body to cancel");
return;
}
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted; cancelling node body");
if (Util.isOverridden(BodyExecution.class, body.getClass(), "cancel", Throwable.class)) {
body.cancel(new FlowInterruptedException(Result.ABORTED, false, causes));
} else { // TODO remove once https://github.com/jenkinsci/workflow-cps-plugin/pull/570 is widely deployed

Check warning on line 393 in src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: remove once https://github.com/jenkinsci/workflow-cps-plugin/pull/570 is widely deployed
body.cancel(causes);

Check warning on line 394 in src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered line

Line 394 is not covered by tests
}
});
}
}, TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
}
}
}

public static final class RemovedNodeCause extends CauseOfInterruption {
public static final class RemovedNodeCause extends RetryableCauseOfInterruption {
@SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", justification = "deliberately mutable")
public static boolean ENABLED = Boolean.parseBoolean(System.getProperty(ExecutorStepExecution.class.getName() + ".REMOVED_NODE_DETECTION", "true"));
@Override public String getShortDescription() {
return "Agent was removed";
}
}

public static final class RemovedNodeTimeoutCause extends RetryableCauseOfInterruption {
@Override public String getShortDescription() {
return "Timeout waiting for agent to come back";
}
}

/**
* Base class for a cause of interruption that can be retried via {@link AgentErrorCondition}.
*/
private abstract static class RetryableCauseOfInterruption extends CauseOfInterruption implements AgentErrorCondition.Retryable {}

/** Transient handle of a running executor task. */
private static final class RunningTask {
/** null until placeholder executable runs */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,38 @@

package org.jenkinsci.plugins.workflow.support.steps;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import hudson.model.Label;
import hudson.model.Queue;
import hudson.model.Result;
import hudson.slaves.DumbSlave;
import hudson.slaves.RetentionStrategy;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import jenkins.model.InterruptedBuildAction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.anyOf;
import org.jenkinci.plugins.mock_slave.MockCloud;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionList;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.JenkinsSessionRule;
Expand All @@ -60,9 +65,12 @@ public class ExecutorStepDynamicContextTest {

@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public JenkinsSessionRule sessions = new JenkinsSessionRule();
@Rule public TemporaryFolder tmp = new TemporaryFolder();
@Rule public LoggerRule logging = new LoggerRule();

private void commonSetup() {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
}

@Test public void canceledQueueItem() throws Throwable {
sessions.then(j -> {
DumbSlave s = j.createSlave(Label.get("remote"));
Expand All @@ -75,11 +83,7 @@ public class ExecutorStepDynamicContextTest {
sessions.then(j -> {
SemaphoreStep.success("wait/1", null);
WorkflowRun b = j.jenkins.getItemByFullName("p", WorkflowJob.class).getBuildByNumber(1);
while (Queue.getInstance().getItems().length == 0) {
Thread.sleep(100);
}
Queue.Item[] items = Queue.getInstance().getItems();
assertEquals(1, items.length);
var items = await().timeout(Duration.ofMinutes(1)).until(() -> j.jenkins.getQueue().getItems(), arrayWithSize(1));
Queue.getInstance().cancel(items[0]);
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(b));
InterruptedBuildAction iba = b.getAction(InterruptedBuildAction.class);
Expand All @@ -98,7 +102,7 @@ public class ExecutorStepDynamicContextTest {
*/
@Issue("JENKINS-36013")
@Test public void normalNodeDisappearance() throws Throwable {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
commonSetup();
sessions.then(j -> {
// Start up a build that needs executor and then reboot and take the node offline
// Starting job first ensures we don't immediately fail if Node comes from a Cloud
Expand All @@ -114,20 +118,19 @@ public class ExecutorStepDynamicContextTest {
sessions.then(j -> {
// Start up a build and then reboot and take the node offline
assertEquals(0, j.jenkins.getLabel("ghost").getNodes().size()); // Make sure test impl is correctly deleted
assertNull(j.jenkins.getNode("ghost")); // Make sure test impl is correctly deleted
WorkflowRun run = j.jenkins.getItemByFullName("p", WorkflowJob.class).getLastBuild();
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
j.assertLogContains("slave0 has been removed for ", run);
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeTimeoutCause.class)));
});
}

@Issue("JENKINS-36013")
@Test public void parallelNodeDisappearance() throws Throwable {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
commonSetup();
sessions.then(j -> {
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("def bs = [:]; for (int _i = 0; _i < 5; _i++) {def i = _i; bs[/b$i/] = {node('remote') {semaphore(/s$i/)}}}; parallel bs", true));
Expand Down Expand Up @@ -207,4 +210,43 @@ public class ExecutorStepDynamicContextTest {
});
}

@Test public void onceRetentionStrategyNodeDisappearance() throws Throwable {
commonSetup();
sessions.then(j -> {
DumbSlave s = j.createSlave(Label.get("ghost"));
s.setRetentionStrategy(new OnceRetentionStrategy(0));
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node('ghost') {if (isUnix()) {sh 'sleep infinity'} else {bat 'echo + sleep infinity && ping -n 999999 localhost'}}", true));
var run = p.scheduleBuild2(0).waitForStart();
j.waitForMessage("+ sleep infinity", run);
j.jenkins.removeNode(s);
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
});
}

@Test public void cloudNodeDisappearance() throws Throwable {
commonSetup();
sessions.then(j -> {
var mockCloud = new MockCloud("mock");
mockCloud.setLabels("mock");
j.jenkins.clouds.add(mockCloud);
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node('mock') {if (isUnix()) {sh 'sleep infinity'} else {bat 'echo + sleep infinity && ping -n 999999 localhost'}}", true));
WorkflowRun run = p.scheduleBuild2(0).waitForStart();
j.waitForMessage("+ sleep infinity", run);
var mockNodes = j.jenkins.getLabel("mock").getNodes();
assertThat(mockNodes, hasSize(1));
var mockNode = mockNodes.iterator().next();
j.jenkins.removeNode(mockNode);
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
});
}
}

0 comments on commit 1891ab0

Please sign in to comment.