diff --git a/pom.xml b/pom.xml index 81360cb8..9e74e5a0 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,12 @@ org.jenkins-ci.plugins scm-api + + org.awaitility + awaitility + 4.2.2 + test + org.jenkins-ci.plugins.workflow workflow-job diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index 4a6c49a6..1aea2847 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -253,11 +253,10 @@ public ListenableFuture apply(final Function f) { for (FlowExecution e : FlowExecutionList.get()) { ListenableFuture> execs = e.getCurrentExecutions(false); - // We transform the futures that return List into futures that return Void before - // passing them to Futures.allAsList so that the combined future only holds strong references to each - // FlowExecution until its StepExecutions have been loaded and applied to the function. This prevents - // us from leaking references to all processed executions in the case where a single build has a stuck - // CpsVmExecutorService that prevents the future returned by getCurrentExecutions from completing. + // It is important that the combined future's return values do not reference the individual step + // executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references + // to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution + // has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing. ListenableFuture results = Futures.transform(execs, (List result) -> { for (StepExecution se : result) { try { diff --git a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java index 86309d01..37acb052 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java @@ -24,6 +24,7 @@ package org.jenkinsci.plugins.workflow.flow; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; @@ -39,14 +40,13 @@ import hudson.model.queue.QueueTaskFuture; import java.io.Serializable; import java.lang.ref.WeakReference; -import java.time.Duration; -import java.time.Instant; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; -import java.util.function.Supplier; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import jenkins.model.Jenkins; -import org.hamcrest.Matcher; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.job.WorkflowJob; import org.jenkinsci.plugins.workflow.job.WorkflowRun; @@ -54,6 +54,7 @@ import org.jenkinsci.plugins.workflow.steps.StepContext; import org.jenkinsci.plugins.workflow.steps.StepDescriptor; import org.jenkinsci.plugins.workflow.steps.StepExecution; +import org.jenkinsci.plugins.workflow.steps.StepExecutions; import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; import org.junit.ClassRule; import org.junit.Test; @@ -135,7 +136,7 @@ public class FlowExecutionListTest { at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) at jenkins.model.Jenkins.(Jenkins.java:1019) */ - waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); + await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); SemaphoreStep.success("wait/1", null); WorkflowRun b = p.getBuildByNumber(1); @@ -166,26 +167,28 @@ public class FlowExecutionListTest { @Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable { sessions.then(r -> { var notStuck = r.createProject(WorkflowJob.class, "not-stuck"); - notStuck.setDefinition(new CpsFlowDefinition("semaphore('wait')", true)); + notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true)); var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart(); SemaphoreStep.waitForStart("wait/1", notStuckBuild); WeakReference notStuckBuildRef = new WeakReference<>(notStuckBuild); // Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck. var stuck = r.createProject(WorkflowJob.class, "stuck"); - stuck.setDefinition(new CpsFlowDefinition("echo 'test message'; Thread.sleep(Integer.MAX_VALUE)", false)); + stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false)); var stuckBuild = stuck.scheduleBuild2(0).waitForStart(); - r.waitForMessage("test message", stuckBuild); - Thread.sleep(1000); // We need Thread.sleep to be running in the CpsVmExecutorService. + await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck")); // Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService // for stuck #1 that will never complete, so the resulting future will never complete. StepExecution.applyAll(e -> null); - // Let notStuckBuild complete and check that it can be GC'd. + // Let notStuckBuild complete and clean up all references. SemaphoreStep.success("wait/1", null); r.waitForCompletion(notStuckBuild); notStuckBuild = null; // Clear out the local variable in this thread. - Jenkins.get().getQueue().clearLeftItems(); // We don't want to wait 5 minutes. + Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared. + // Make sure that the reference can be GC'd. MemoryAssert.assertGC(notStuckBuildRef, true); - // TODO: Test cleanup hangs for 1 minute in CpsFlowExecution.suspendAll because the checkpoint task can't run. + // Allow stuck #1 to complete so the test can be cleaned up promptly. + SynchronousBlockingStep.unblock("stuck"); + r.waitForCompletion(stuckBuild); }); } @@ -227,14 +230,59 @@ public String getFunctionName() { } /** - * Wait up to 5 seconds for the given supplier to return a matching value. + * Blocks the CPS VM thread synchronously (bad!) to test related problems. */ - private static void waitFor(Supplier valueSupplier, Matcher matcher) throws InterruptedException { - Instant end = Instant.now().plus(Duration.ofSeconds(5)); - while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { - Thread.sleep(100L); + public static class SynchronousBlockingStep extends Step implements Serializable { + private static final long serialVersionUID = 1L; + private static final Map blocked = new HashMap<>(); + private final String id; + + @DataBoundConstructor + public SynchronousBlockingStep(String id) { + this.id = id; + if (blocked.put(id, State.NOT_STARTED) != null) { + throw new IllegalArgumentException("Attempting to reuse ID: " + id); + } + } + + @Override + public StepExecution start(StepContext context) throws Exception { + return StepExecutions.synchronous(context, c -> { + blocked.put(id, State.BLOCKED); + c.get(TaskListener.class).getLogger().println(id + " blocked"); + while (blocked.get(id) == State.BLOCKED) { + Thread.sleep(100L); + } + c.get(TaskListener.class).getLogger().println(id + " unblocked "); + return null; + }); + } + + public static boolean isStarted(String id) { + var state = blocked.get(id); + return state != null && state != State.NOT_STARTED; + } + + public static void unblock(String id) { + blocked.put(id, State.UNBLOCKED); + } + + private enum State { + NOT_STARTED, + BLOCKED, + UNBLOCKED, + } + + @TestExtension public static class DescriptorImpl extends StepDescriptor { + @Override + public Set> getRequiredContext() { + return Collections.singleton(TaskListener.class); + } + @Override + public String getFunctionName() { + return "blockSynchronously"; + } } - assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); } }