Skip to content

Commit

Permalink
Allow the stuck build to be cleaned up promptly at the end of the test
Browse files Browse the repository at this point in the history
  • Loading branch information
dwnusbaum committed Aug 9, 2024
1 parent 7100611 commit 2de670d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 23 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>scm-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-job</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,10 @@ public ListenableFuture<?> apply(final Function<StepExecution, Void> f) {

for (FlowExecution e : FlowExecutionList.get()) {
ListenableFuture<List<StepExecution>> execs = e.getCurrentExecutions(false);
// We transform the futures that return List<StepExecution> into futures that return Void before
// passing them to Futures.allAsList so that the combined future only holds strong references to each
// FlowExecution until its StepExecutions have been loaded and applied to the function. This prevents
// us from leaking references to all processed executions in the case where a single build has a stuck
// CpsVmExecutorService that prevents the future returned by getCurrentExecutions from completing.
// It is important that the combined future's return values do not reference the individual step
// executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references
// to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution
// has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing.
ListenableFuture<Void> results = Futures.transform(execs, (List<StepExecution> result) -> {
for (StepExecution se : result) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
Expand All @@ -39,21 +40,21 @@
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import jenkins.model.Jenkins;
import org.hamcrest.Matcher;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.steps.StepExecutions;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -135,7 +136,7 @@ public class FlowExecutionListTest {
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
Expand Down Expand Up @@ -166,26 +167,28 @@ public class FlowExecutionListTest {
@Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable {
sessions.then(r -> {
var notStuck = r.createProject(WorkflowJob.class, "not-stuck");
notStuck.setDefinition(new CpsFlowDefinition("semaphore('wait')", true));
notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true));
var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", notStuckBuild);
WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild);
// Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck.
var stuck = r.createProject(WorkflowJob.class, "stuck");
stuck.setDefinition(new CpsFlowDefinition("echo 'test message'; Thread.sleep(Integer.MAX_VALUE)", false));
stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false));
var stuckBuild = stuck.scheduleBuild2(0).waitForStart();
r.waitForMessage("test message", stuckBuild);
Thread.sleep(1000); // We need Thread.sleep to be running in the CpsVmExecutorService.
await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck"));
// Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService
// for stuck #1 that will never complete, so the resulting future will never complete.
StepExecution.applyAll(e -> null);
// Let notStuckBuild complete and check that it can be GC'd.
// Let notStuckBuild complete and clean up all references.
SemaphoreStep.success("wait/1", null);
r.waitForCompletion(notStuckBuild);
notStuckBuild = null; // Clear out the local variable in this thread.
Jenkins.get().getQueue().clearLeftItems(); // We don't want to wait 5 minutes.
Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared.
// Make sure that the reference can be GC'd.
MemoryAssert.assertGC(notStuckBuildRef, true);
// TODO: Test cleanup hangs for 1 minute in CpsFlowExecution.suspendAll because the checkpoint task can't run.
// Allow stuck #1 to complete so the test can be cleaned up promptly.
SynchronousBlockingStep.unblock("stuck");
r.waitForCompletion(stuckBuild);
});
}

Expand Down Expand Up @@ -227,14 +230,59 @@ public String getFunctionName() {
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
* Blocks the CPS VM thread synchronously (bad!) to test related problems.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
public static class SynchronousBlockingStep extends Step implements Serializable {
private static final long serialVersionUID = 1L;
private static final Map<String, State> blocked = new HashMap<>();
private final String id;

@DataBoundConstructor
public SynchronousBlockingStep(String id) {
this.id = id;
if (blocked.put(id, State.NOT_STARTED) != null) {
throw new IllegalArgumentException("Attempting to reuse ID: " + id);
}
}

@Override
public StepExecution start(StepContext context) throws Exception {
return StepExecutions.synchronous(context, c -> {
blocked.put(id, State.BLOCKED);
c.get(TaskListener.class).getLogger().println(id + " blocked");
while (blocked.get(id) == State.BLOCKED) {
Thread.sleep(100L);
}
c.get(TaskListener.class).getLogger().println(id + " unblocked ");
return null;
});
}

public static boolean isStarted(String id) {
var state = blocked.get(id);
return state != null && state != State.NOT_STARTED;
}

public static void unblock(String id) {
blocked.put(id, State.UNBLOCKED);
}

private enum State {
NOT_STARTED,
BLOCKED,
UNBLOCKED,
}

@TestExtension public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "blockSynchronously";
}
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}

0 comments on commit 2de670d

Please sign in to comment.