diff --git a/pom.xml b/pom.xml index 81360cb8..9e74e5a0 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,12 @@ org.jenkins-ci.plugins scm-api + + org.awaitility + awaitility + 4.2.2 + test + org.jenkins-ci.plugins.workflow workflow-job diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index e0a0809f..1aea2847 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -253,24 +253,25 @@ public ListenableFuture apply(final Function f) { for (FlowExecution e : FlowExecutionList.get()) { ListenableFuture> execs = e.getCurrentExecutions(false); - all.add(execs); - Futures.addCallback(execs,new FutureCallback>() { - @Override - public void onSuccess(@NonNull List result) { - for (StepExecution e : result) { - try { - f.apply(e); - } catch (RuntimeException x) { - LOGGER.log(Level.WARNING, null, x); - } + // It is important that the combined future's return values do not reference the individual step + // executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references + // to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution + // has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing. + ListenableFuture results = Futures.transform(execs, (List result) -> { + for (StepExecution se : result) { + try { + f.apply(se); + } catch (RuntimeException x) { + LOGGER.log(Level.WARNING, null, x); } } - - @Override - public void onFailure(@NonNull Throwable t) { - LOGGER.log(Level.WARNING, null, t); - } + return null; + }, MoreExecutors.directExecutor()); + ListenableFuture resultsWithWarningsLogged = Futures.catching(results, Throwable.class, t -> { + LOGGER.log(Level.WARNING, null, t); + return null; }, MoreExecutors.directExecutor()); + all.add(resultsWithWarningsLogged); } return Futures.allAsList(all); diff --git a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java index 8cf59f30..9b9ee501 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java @@ -24,6 +24,7 @@ package org.jenkinsci.plugins.workflow.flow; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; @@ -38,13 +39,14 @@ import hudson.model.TaskListener; import hudson.model.queue.QueueTaskFuture; import java.io.Serializable; -import java.time.Duration; -import java.time.Instant; +import java.lang.ref.WeakReference; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; -import java.util.function.Supplier; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; -import org.hamcrest.Matcher; +import jenkins.model.Jenkins; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.job.WorkflowJob; import org.jenkinsci.plugins.workflow.job.WorkflowRun; @@ -52,6 +54,7 @@ import org.jenkinsci.plugins.workflow.steps.StepContext; import org.jenkinsci.plugins.workflow.steps.StepDescriptor; import org.jenkinsci.plugins.workflow.steps.StepExecution; +import org.jenkinsci.plugins.workflow.steps.StepExecutions; import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; import org.junit.ClassRule; import org.junit.Test; @@ -60,6 +63,7 @@ import org.jvnet.hudson.test.Issue; import org.jvnet.hudson.test.LoggerRule; import org.jvnet.hudson.test.JenkinsSessionRule; +import org.jvnet.hudson.test.MemoryAssert; import org.jvnet.hudson.test.TestExtension; import org.kohsuke.stapler.DataBoundConstructor; @@ -132,7 +136,7 @@ public class FlowExecutionListTest { at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) at jenkins.model.Jenkins.(Jenkins.java:1019) */ - waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); + await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); SemaphoreStep.success("wait/1", null); WorkflowRun b = p.getBuildByNumber(1); @@ -160,6 +164,34 @@ public class FlowExecutionListTest { }); } + @Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable { + sessions.then(r -> { + var notStuck = r.createProject(WorkflowJob.class, "not-stuck"); + notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true)); + var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait/1", notStuckBuild); + WeakReference notStuckBuildRef = new WeakReference<>(notStuckBuild); + // Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck. + var stuck = r.createProject(WorkflowJob.class, "stuck"); + stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false)); + var stuckBuild = stuck.scheduleBuild2(0).waitForStart(); + await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck")); + // Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService + // for stuck #1 that will never complete, so the resulting future will never complete. + StepExecution.applyAll(e -> null); + // Let notStuckBuild complete and clean up all references. + SemaphoreStep.success("wait/1", null); + r.waitForCompletion(notStuckBuild); + notStuckBuild = null; // Clear out the local variable in this thread. + Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared. + // Make sure that the reference can be GC'd. + MemoryAssert.assertGC(notStuckBuildRef, true); + // Allow stuck #1 to complete so the test can be cleaned up promptly. + SynchronousBlockingStep.unblock("stuck"); + r.waitForCompletion(stuckBuild); + }); + } + public static class NonResumableStep extends Step implements Serializable { public static final long serialVersionUID = 1L; @DataBoundConstructor @@ -198,14 +230,59 @@ public String getFunctionName() { } /** - * Wait up to 5 seconds for the given supplier to return a matching value. + * Blocks the CPS VM thread synchronously (bad!) to test related problems. */ - private static void waitFor(Supplier valueSupplier, Matcher matcher) throws InterruptedException { - Instant end = Instant.now().plus(Duration.ofSeconds(5)); - while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { - Thread.sleep(100L); + public static class SynchronousBlockingStep extends Step implements Serializable { + private static final long serialVersionUID = 1L; + private static final Map blocked = new HashMap<>(); + private final String id; + + @DataBoundConstructor + public SynchronousBlockingStep(String id) { + this.id = id; + if (blocked.put(id, State.NOT_STARTED) != null) { + throw new IllegalArgumentException("Attempting to reuse ID: " + id); + } + } + + @Override + public StepExecution start(StepContext context) throws Exception { + return StepExecutions.synchronous(context, c -> { + blocked.put(id, State.BLOCKED); + c.get(TaskListener.class).getLogger().println(id + " blocked"); + while (blocked.get(id) == State.BLOCKED) { + Thread.sleep(100L); + } + c.get(TaskListener.class).getLogger().println(id + " unblocked "); + return null; + }); + } + + public static boolean isStarted(String id) { + var state = blocked.get(id); + return state != null && state != State.NOT_STARTED; + } + + public static void unblock(String id) { + blocked.put(id, State.UNBLOCKED); + } + + private enum State { + NOT_STARTED, + BLOCKED, + UNBLOCKED, + } + + @TestExtension("stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck") public static class DescriptorImpl extends StepDescriptor { + @Override + public Set> getRequiredContext() { + return Collections.singleton(TaskListener.class); + } + @Override + public String getFunctionName() { + return "blockSynchronously"; + } } - assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); } }